HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe)

(cherry picked from commit 75a326aaff)

(cherry picked from commit 06552a15d5)
This commit is contained in:
Colin Patrick Mccabe 2014-11-25 17:44:34 -08:00 committed by Vinod Kumar Vavilapalli
parent 8ed162bcbf
commit 4363145128
11 changed files with 477 additions and 351 deletions

View File

@ -156,6 +156,9 @@ Release 2.6.1 - UNRELEASED
HDFS-7980. Incremental BlockReport will dramatically slow down namenode
startup. (Walter Su via szetszwo)
HDFS-7446. HDFS inotify should have the ability to determine what txid it
has read up to (cmccabe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -19,11 +19,10 @@
package org.apache.hadoop.hdfs;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Stream for reading inotify events. DFSInotifyEventInputStreams should not
@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream {
.class);
private final ClientProtocol namenode;
private Iterator<Event> it;
private Iterator<EventBatch> it;
private long lastReadTxid;
/**
* The most recent txid the NameNode told us it has sync'ed -- helps us
@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream {
}
/**
* Returns the next event in the stream or null if no new events are currently
* available.
* Returns the next batch of events in the stream or null if no new
* batches are currently available.
*
* @throws IOException because of network error or edit log
* corruption. Also possible if JournalNodes are unresponsive in the
* QJM setting (even one unresponsive JournalNode is enough in rare cases),
* so catching this exception and retrying at least a few times is
* recommended.
* @throws MissingEventsException if we cannot return the next event in the
* stream because the data for the event (and possibly some subsequent events)
* has been deleted (generally because this stream is a very large number of
* events behind the current state of the NameNode). It is safe to continue
* reading from the stream after this exception is thrown -- the next
* available event will be returned.
* @throws MissingEventsException if we cannot return the next batch in the
* stream because the data for the events (and possibly some subsequent
* events) has been deleted (generally because this stream is a very large
* number of transactions behind the current state of the NameNode). It is
* safe to continue reading from the stream after this exception is thrown
* The next available batch of events will be returned.
*/
public Event poll() throws IOException, MissingEventsException {
public EventBatch poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream {
return null;
}
if (!it.hasNext()) {
EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
it = el.getEvents().iterator();
it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream {
}
/**
* Return a estimate of how many events behind the NameNode's current state
* this stream is. Clients should periodically call this method and check if
* its result is steadily increasing, which indicates that they are falling
* behind (i.e. events are being generated faster than the client is reading
* them). If a client falls too far behind events may be deleted before the
* client can read them.
* Return a estimate of how many transaction IDs behind the NameNode's
* current state this stream is. Clients should periodically call this method
* and check if its result is steadily increasing, which indicates that they
* are falling behind (i.e. transaction are being generated faster than the
* client is reading them). If a client falls too far behind events may be
* deleted before the client can read them.
* <p/>
* A return value of -1 indicates that an estimate could not be produced, and
* should be ignored. The value returned by this method is really only useful
* when compared to previous or subsequent returned values.
*/
public long getEventsBehindEstimate() {
public long getTxidsBehindEstimate() {
if (syncTxid == 0) {
return -1;
} else {
@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream {
}
/**
* Returns the next event in the stream, waiting up to the specified amount of
* time for a new event. Returns null if a new event is not available at the
* Returns the next event batch in the stream, waiting up to the specified
* amount of time for a new batch. Returns null if one is not available at the
* end of the specified amount of time. The time before the method returns may
* exceed the specified amount of time by up to the time required for an RPC
* to the NameNode.
@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream {
* see {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
public Event poll(long time, TimeUnit tu) throws IOException,
public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
Event next = null;
EventBatch next = null;
while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) {
@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream {
}
/**
* Returns the next event in the stream, waiting indefinitely if a new event
* is not immediately available.
* Returns the next batch of events in the stream, waiting indefinitely if
* a new batch is not immediately available.
*
* @throws IOException see {@link DFSInotifyEventInputStream#poll()}
* @throws MissingEventsException see
* {@link DFSInotifyEventInputStream#poll()}
* @throws InterruptedException if the calling thread is interrupted
*/
public Event take() throws IOException, InterruptedException,
public EventBatch take() throws IOException, InterruptedException,
MissingEventsException {
Event next = null;
EventBatch next = null;
int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.inotify;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* A batch of events that all happened on the same transaction ID.
*/
@InterfaceAudience.Public
public class EventBatch {
private final long txid;
private final Event[] events;
public EventBatch(long txid, Event[] events) {
this.txid = txid;
this.events = events;
}
public long getTxid() {
return txid;
}
public Event[] getEvents() { return events; }
}

View File

@ -23,30 +23,30 @@ import org.apache.hadoop.classification.InterfaceAudience;
import java.util.List;
/**
* Contains a set of events, the transaction ID in the edit log up to which we
* read to produce these events, and the first txid we observed when producing
* these events (the last of which is for the purpose of determining whether we
* have missed events due to edit deletion). Also contains the most recent txid
* that the NameNode has sync'ed, so the client can determine how far behind in
* the edit log it is.
* Contains a list of event batches, the transaction ID in the edit log up to
* which we read to produce these events, and the first txid we observed when
* producing these events (the last of which is for the purpose of determining
* whether we have missed events due to edit deletion). Also contains the most
* recent txid that the NameNode has sync'ed, so the client can determine how
* far behind in the edit log it is.
*/
@InterfaceAudience.Private
public class EventsList {
private List<Event> events;
public class EventBatchList {
private List<EventBatch> batches;
private long firstTxid;
private long lastTxid;
private long syncTxid;
public EventsList(List<Event> events, long firstTxid, long lastTxid,
long syncTxid) {
this.events = events;
public EventBatchList(List<EventBatch> batches, long firstTxid,
long lastTxid, long syncTxid) {
this.batches = batches;
this.firstTxid = firstTxid;
this.lastTxid = lastTxid;
this.syncTxid = syncTxid;
}
public List<Event> getEvents() {
return events;
public List<EventBatch> getBatches() {
return batches;
}
public long getFirstTxid() {

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -1405,9 +1405,9 @@ public interface ClientProtocol {
public long getCurrentEditLogTxid() throws IOException;
/**
* Get an ordered list of events corresponding to the edit log transactions
* from txid onwards.
* Get an ordered list of batches of events corresponding to the edit log
* transactions for txids equal to or greater than txid.
*/
@Idempotent
public EventsList getEditsFromTxid(long txid) throws IOException;
public EventBatchList getEditsFromTxid(long txid) throws IOException;
}

View File

@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@ -1480,7 +1480,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public EventsList getEditsFromTxid(long txid) throws IOException {
public EventBatchList getEditsFromTxid(long txid) throws IOException {
GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
.setTxid(txid).build();
try {

View File

@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -2516,173 +2517,197 @@ public class PBHelper {
}
}
public static EventsList convert(GetEditsFromTxidResponseProto resp) throws
public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws
IOException {
List<Event> events = Lists.newArrayList();
for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) {
switch(p.getType()) {
case EVENT_CLOSE:
InotifyProtos.CloseEventProto close =
InotifyProtos.CloseEventProto.parseFrom(p.getContents());
events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(),
close.getTimestamp()));
break;
case EVENT_CREATE:
InotifyProtos.CreateEventProto create =
InotifyProtos.CreateEventProto.parseFrom(p.getContents());
events.add(new Event.CreateEvent.Builder()
.iNodeType(createTypeConvert(create.getType()))
.path(create.getPath())
.ctime(create.getCtime())
.ownerName(create.getOwnerName())
.groupName(create.getGroupName())
.perms(convert(create.getPerms()))
.replication(create.getReplication())
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
create.getSymlinkTarget())
.overwrite(create.getOverwrite()).build());
break;
case EVENT_METADATA:
InotifyProtos.MetadataUpdateEventProto meta =
InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
events.add(new Event.MetadataUpdateEvent.Builder()
.path(meta.getPath())
.metadataType(metadataUpdateTypeConvert(meta.getType()))
.mtime(meta.getMtime())
.atime(meta.getAtime())
.replication(meta.getReplication())
.ownerName(
meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
.groupName(
meta.getGroupName().isEmpty() ? null : meta.getGroupName())
.perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
.acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
meta.getAclsList()))
.xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
meta.getXAttrsList()))
.xAttrsRemoved(meta.getXAttrsRemoved())
.build());
break;
case EVENT_RENAME:
InotifyProtos.RenameEventProto rename =
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
rename.getTimestamp()));
break;
case EVENT_APPEND:
InotifyProtos.AppendEventProto reopen =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
events.add(new Event.AppendEvent(reopen.getPath()));
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
break;
default:
throw new RuntimeException("Unexpected inotify event type: " +
p.getType());
}
final InotifyProtos.EventsListProto list = resp.getEventsList();
final long firstTxid = list.getFirstTxid();
final long lastTxid = list.getLastTxid();
List<EventBatch> batches = Lists.newArrayList();
if (list.getEventsList().size() > 0) {
throw new IOException("Can't handle old inotify server response.");
}
return new EventsList(events, resp.getEventsList().getFirstTxid(),
for (InotifyProtos.EventBatchProto bp : list.getBatchList()) {
long txid = bp.getTxid();
if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) {
throw new IOException("Error converting TxidResponseProto: got a " +
"transaction id " + txid + " that was outside the range of [" +
firstTxid + ", " + lastTxid + "].");
}
List<Event> events = Lists.newArrayList();
for (InotifyProtos.EventProto p : bp.getEventsList()) {
switch (p.getType()) {
case EVENT_CLOSE:
InotifyProtos.CloseEventProto close =
InotifyProtos.CloseEventProto.parseFrom(p.getContents());
events.add(new Event.CloseEvent(close.getPath(),
close.getFileSize(), close.getTimestamp()));
break;
case EVENT_CREATE:
InotifyProtos.CreateEventProto create =
InotifyProtos.CreateEventProto.parseFrom(p.getContents());
events.add(new Event.CreateEvent.Builder()
.iNodeType(createTypeConvert(create.getType()))
.path(create.getPath())
.ctime(create.getCtime())
.ownerName(create.getOwnerName())
.groupName(create.getGroupName())
.perms(convert(create.getPerms()))
.replication(create.getReplication())
.symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
create.getSymlinkTarget())
.overwrite(create.getOverwrite()).build());
break;
case EVENT_METADATA:
InotifyProtos.MetadataUpdateEventProto meta =
InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
events.add(new Event.MetadataUpdateEvent.Builder()
.path(meta.getPath())
.metadataType(metadataUpdateTypeConvert(meta.getType()))
.mtime(meta.getMtime())
.atime(meta.getAtime())
.replication(meta.getReplication())
.ownerName(
meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
.groupName(
meta.getGroupName().isEmpty() ? null : meta.getGroupName())
.perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
.acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
meta.getAclsList()))
.xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
meta.getXAttrsList()))
.xAttrsRemoved(meta.getXAttrsRemoved())
.build());
break;
case EVENT_RENAME:
InotifyProtos.RenameEventProto rename =
InotifyProtos.RenameEventProto.parseFrom(p.getContents());
events.add(new Event.RenameEvent(rename.getSrcPath(),
rename.getDestPath(), rename.getTimestamp()));
break;
case EVENT_APPEND:
InotifyProtos.AppendEventProto reopen =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
events.add(new Event.AppendEvent(reopen.getPath()));
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
events.add(new Event.UnlinkEvent(unlink.getPath(),
unlink.getTimestamp()));
break;
default:
throw new RuntimeException("Unexpected inotify event type: " +
p.getType());
}
}
batches.add(new EventBatch(txid, events.toArray(new Event[0])));
}
return new EventBatchList(batches, resp.getEventsList().getFirstTxid(),
resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
}
public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) {
public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) {
InotifyProtos.EventsListProto.Builder builder =
InotifyProtos.EventsListProto.newBuilder();
for (Event e : el.getEvents()) {
switch(e.getEventType()) {
case CLOSE:
Event.CloseEvent ce = (Event.CloseEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CLOSE)
.setContents(
InotifyProtos.CloseEventProto.newBuilder()
.setPath(ce.getPath())
.setFileSize(ce.getFileSize())
.setTimestamp(ce.getTimestamp()).build().toByteString()
).build());
break;
case CREATE:
Event.CreateEvent ce2 = (Event.CreateEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CREATE)
.setContents(
InotifyProtos.CreateEventProto.newBuilder()
.setType(createTypeConvert(ce2.getiNodeType()))
.setPath(ce2.getPath())
.setCtime(ce2.getCtime())
.setOwnerName(ce2.getOwnerName())
.setGroupName(ce2.getGroupName())
.setPerms(convert(ce2.getPerms()))
.setReplication(ce2.getReplication())
.setSymlinkTarget(ce2.getSymlinkTarget() == null ?
"" : ce2.getSymlinkTarget())
.setOverwrite(ce2.getOverwrite()).build().toByteString()
).build());
break;
case METADATA:
Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
InotifyProtos.MetadataUpdateEventProto.Builder metaB =
InotifyProtos.MetadataUpdateEventProto.newBuilder()
.setPath(me.getPath())
.setType(metadataUpdateTypeConvert(me.getMetadataType()))
.setMtime(me.getMtime())
.setAtime(me.getAtime())
.setReplication(me.getReplication())
.setOwnerName(me.getOwnerName() == null ? "" :
me.getOwnerName())
.setGroupName(me.getGroupName() == null ? "" :
me.getGroupName())
.addAllAcls(me.getAcls() == null ?
Lists.<AclEntryProto>newArrayList() :
convertAclEntryProto(me.getAcls()))
.addAllXAttrs(me.getxAttrs() == null ?
Lists.<XAttrProto>newArrayList() :
convertXAttrProto(me.getxAttrs()))
.setXAttrsRemoved(me.isxAttrsRemoved());
if (me.getPerms() != null) {
metaB.setPerms(convert(me.getPerms()));
for (EventBatch b : el.getBatches()) {
List<InotifyProtos.EventProto> events = Lists.newArrayList();
for (Event e : b.getEvents()) {
switch (e.getEventType()) {
case CLOSE:
Event.CloseEvent ce = (Event.CloseEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CLOSE)
.setContents(
InotifyProtos.CloseEventProto.newBuilder()
.setPath(ce.getPath())
.setFileSize(ce.getFileSize())
.setTimestamp(ce.getTimestamp()).build().toByteString()
).build());
break;
case CREATE:
Event.CreateEvent ce2 = (Event.CreateEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_CREATE)
.setContents(
InotifyProtos.CreateEventProto.newBuilder()
.setType(createTypeConvert(ce2.getiNodeType()))
.setPath(ce2.getPath())
.setCtime(ce2.getCtime())
.setOwnerName(ce2.getOwnerName())
.setGroupName(ce2.getGroupName())
.setPerms(convert(ce2.getPerms()))
.setReplication(ce2.getReplication())
.setSymlinkTarget(ce2.getSymlinkTarget() == null ?
"" : ce2.getSymlinkTarget())
.setOverwrite(ce2.getOverwrite()).build().toByteString()
).build());
break;
case METADATA:
Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
InotifyProtos.MetadataUpdateEventProto.Builder metaB =
InotifyProtos.MetadataUpdateEventProto.newBuilder()
.setPath(me.getPath())
.setType(metadataUpdateTypeConvert(me.getMetadataType()))
.setMtime(me.getMtime())
.setAtime(me.getAtime())
.setReplication(me.getReplication())
.setOwnerName(me.getOwnerName() == null ? "" :
me.getOwnerName())
.setGroupName(me.getGroupName() == null ? "" :
me.getGroupName())
.addAllAcls(me.getAcls() == null ?
Lists.<AclEntryProto>newArrayList() :
convertAclEntryProto(me.getAcls()))
.addAllXAttrs(me.getxAttrs() == null ?
Lists.<XAttrProto>newArrayList() :
convertXAttrProto(me.getxAttrs()))
.setXAttrsRemoved(me.isxAttrsRemoved());
if (me.getPerms() != null) {
metaB.setPerms(convert(me.getPerms()));
}
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_METADATA)
.setContents(metaB.build().toByteString())
.build());
break;
case RENAME:
Event.RenameEvent re = (Event.RenameEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_RENAME)
.setContents(
InotifyProtos.RenameEventProto.newBuilder()
.setSrcPath(re.getSrcPath())
.setDestPath(re.getDstPath())
.setTimestamp(re.getTimestamp()).build().toByteString()
).build());
break;
case APPEND:
Event.AppendEvent re2 = (Event.AppendEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
.setContents(
InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath()).build().toByteString()
).build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_UNLINK)
.setContents(
InotifyProtos.UnlinkEventProto.newBuilder()
.setPath(ue.getPath())
.setTimestamp(ue.getTimestamp()).build().toByteString()
).build());
break;
default:
throw new RuntimeException("Unexpected inotify event: " + e);
}
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_METADATA)
.setContents(metaB.build().toByteString())
.build());
break;
case RENAME:
Event.RenameEvent re = (Event.RenameEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_RENAME)
.setContents(
InotifyProtos.RenameEventProto.newBuilder()
.setSrcPath(re.getSrcPath())
.setDestPath(re.getDstPath())
.setTimestamp(re.getTimestamp()).build().toByteString()
).build());
break;
case APPEND:
Event.AppendEvent re2 = (Event.AppendEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
.setContents(
InotifyProtos.AppendEventProto.newBuilder()
.setPath(re2.getPath()).build().toByteString()
).build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
builder.addEvents(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_UNLINK)
.setContents(
InotifyProtos.UnlinkEventProto.newBuilder()
.setPath(ue.getPath())
.setTimestamp(ue.getTimestamp()).build().toByteString()
).build());
break;
default:
throw new RuntimeException("Unexpected inotify event: " + e);
}
builder.addBatch(InotifyProtos.EventBatchProto.newBuilder().
setTxid(b.getTxid()).
addAllEvents(events));
}
builder.setFirstTxid(el.getFirstTxid());
builder.setLastTxid(el.getLastTxid());

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.protocol.Block;
import java.util.List;
@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator {
return size;
}
public static Event[] translate(FSEditLogOp op) {
public static EventBatch translate(FSEditLogOp op) {
switch(op.opCode) {
case OP_ADD:
FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
if (addOp.blocks.length == 0) { // create
return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
return new EventBatch(op.txid,
new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
.ctime(addOp.atime)
.replication(addOp.replication)
.ownerName(addOp.permissions.getUserName())
.groupName(addOp.permissions.getGroupName())
.perms(addOp.permissions.getPermission())
.overwrite(addOp.overwrite)
.iNodeType(Event.CreateEvent.INodeType.FILE).build() };
.iNodeType(Event.CreateEvent.INodeType.FILE).build() });
} else {
return new Event[] { new Event.AppendEvent(addOp.path) };
return new EventBatch(op.txid,
new Event[] { new Event.AppendEvent(addOp.path) });
}
case OP_CLOSE:
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
return new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
return new EventBatch(op.txid, new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
.path(setRepOp.path)
.replication(setRepOp.replication).build() };
.replication(setRepOp.replication).build() });
case OP_CONCAT_DELETE:
FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
List<Event> events = Lists.newArrayList();
@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator {
events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
}
events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
return events.toArray(new Event[0]);
return new EventBatch(op.txid, events.toArray(new Event[0]));
case OP_RENAME_OLD:
FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
return new Event[] {
new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
return new EventBatch(op.txid, new Event[] {
new Event.RenameEvent(rnOpOld.src,
rnOpOld.dst, rnOpOld.timestamp) });
case OP_RENAME:
FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
return new Event[] {
new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
return new EventBatch(op.txid, new Event[] {
new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) });
case OP_DELETE:
FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
return new EventBatch(op.txid, new Event[] {
new Event.UnlinkEvent(delOp.path, delOp.timestamp) });
case OP_MKDIR:
FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
return new EventBatch(op.txid,
new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
.ctime(mkOp.timestamp)
.ownerName(mkOp.permissions.getUserName())
.groupName(mkOp.permissions.getGroupName())
.perms(mkOp.permissions.getPermission())
.iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
.iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() });
case OP_SET_PERMISSIONS:
FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
.path(permOp.src)
.perms(permOp.permissions).build() };
.perms(permOp.permissions).build() });
case OP_SET_OWNER:
FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
.path(ownOp.src)
.ownerName(ownOp.username).groupName(ownOp.groupname).build() };
.ownerName(ownOp.username).groupName(ownOp.groupname).build() });
case OP_TIMES:
FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
.path(timesOp.path)
.atime(timesOp.atime).mtime(timesOp.mtime).build() };
.atime(timesOp.atime).mtime(timesOp.mtime).build() });
case OP_SYMLINK:
FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
return new EventBatch(op.txid,
new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
.ctime(symOp.atime)
.ownerName(symOp.permissionStatus.getUserName())
.groupName(symOp.permissionStatus.getGroupName())
.perms(symOp.permissionStatus.getPermission())
.symlinkTarget(symOp.value)
.iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
.iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() });
case OP_REMOVE_XATTR:
FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(rxOp.src)
.xAttrs(rxOp.xAttrs)
.xAttrsRemoved(true).build() };
.xAttrsRemoved(true).build() });
case OP_SET_XATTR:
FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
.path(sxOp.src)
.xAttrs(sxOp.xAttrs)
.xAttrsRemoved(false).build() };
.xAttrsRemoved(false).build() });
case OP_SET_ACL:
FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
return new Event[] { new Event.MetadataUpdateEvent.Builder()
return new EventBatch(op.txid,
new Event[] { new Event.MetadataUpdateEvent.Builder()
.metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
.path(saOp.src)
.acls(saOp.aclEntries).build() };
.acls(saOp.aclEntries).build() });
default:
return null;
}

View File

@ -34,7 +34,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
@ -55,8 +54,8 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
@ -67,8 +66,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventsList;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@ -139,10 +138,16 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
@ -155,19 +160,12 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolP
import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@ -175,6 +173,7 @@ import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
/**
@ -1670,7 +1669,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
public EventsList getEditsFromTxid(long txid) throws IOException {
public EventBatchList getEditsFromTxid(long txid) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.READ); // only active
namesystem.checkSuperuserPrivilege();
@ -1689,13 +1688,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
// guaranteed to have been written by this NameNode.)
boolean readInProgress = syncTxid > 0;
List<Event> events = Lists.newArrayList();
List<EventBatch> batches = Lists.newArrayList();
int totalEvents = 0;
long maxSeenTxid = -1;
long firstSeenTxid = -1;
if (syncTxid > 0 && txid > syncTxid) {
// we can't read past syncTxid, so there's no point in going any further
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
Collection<EditLogInputStream> streams = null;
@ -1707,7 +1707,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
// will result
LOG.info("NN is transitioning from active to standby and FSEditLog " +
"is closed -- could not read edits");
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
boolean breakOuter = false;
@ -1725,9 +1725,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
break;
}
Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
if (eventsFromOp != null) {
events.addAll(Arrays.asList(eventsFromOp));
EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
if (eventBatch != null) {
batches.add(eventBatch);
totalEvents += eventBatch.getEvents().length;
}
if (op.getTransactionId() > maxSeenTxid) {
maxSeenTxid = op.getTransactionId();
@ -1735,7 +1736,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
if (firstSeenTxid == -1) {
firstSeenTxid = op.getTransactionId();
}
if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
op.getTransactionId() == syncTxid)) {
// we're done
breakOuter = true;
@ -1750,7 +1751,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
}
return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
}
@Override // TraceAdminProtocol

View File

@ -48,6 +48,11 @@ message EventProto {
required bytes contents = 2;
}
message EventBatchProto {
required int64 txid = 1;
repeated EventProto events = 2;
}
enum INodeType {
I_TYPE_FILE = 0x0;
I_TYPE_DIRECTORY = 0x1;
@ -111,8 +116,9 @@ message UnlinkEventProto {
}
message EventsListProto {
repeated EventProto events = 1;
repeated EventProto events = 1; // deprecated
required int64 firstTxid = 2;
required int64 lastTxid = 3;
required int64 syncTxid = 4;
}
repeated EventBatchProto batch = 5;
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.inotify.Event;
import org.apache.hadoop.hdfs.inotify.EventBatch;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream {
private static final Log LOG = LogFactory.getLog(
TestDFSInotifyEventInputStream.class);
private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
throws IOException, MissingEventsException {
Event next = null;
while ((next = eis.poll()) == null);
return next;
EventBatch batch = null;
while ((batch = eis.poll()) == null);
return batch;
}
private static long checkTxid(EventBatch batch, long prevTxid){
Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
"new txid " + batch.getTxid(), prevTxid < batch.getTxid());
return batch.getTxid();
}
/**
@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
Assert.assertTrue(FSEditLogOpCodes.values().length == 47);
Assert.assertEquals(47, FSEditLogOpCodes.values().length);
}
@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream {
"user::rwx,user:foo:rw-,group::r--,other::---", true));
client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
Event next = null;
EventBatch batch = null;
// RenameOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
Event.RenameEvent re = (Event.RenameEvent) next;
Assert.assertTrue(re.getDstPath().equals("/file4"));
Assert.assertTrue(re.getSrcPath().equals("/file"));
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
long txid = batch.getTxid();
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0];
Assert.assertEquals("/file4", re.getDstPath());
Assert.assertEquals("/file", re.getSrcPath());
Assert.assertTrue(re.getTimestamp() > 0);
long eventsBehind = eis.getEventsBehindEstimate();
long eventsBehind = eis.getTxidsBehindEstimate();
// RenameOldOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
Event.RenameEvent re2 = (Event.RenameEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0];
Assert.assertTrue(re2.getDstPath().equals("/file2"));
Assert.assertTrue(re2.getSrcPath().equals("/file4"));
Assert.assertTrue(re.getTimestamp() > 0);
// AddOp with overwrite
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce = (Event.CreateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
Assert.assertTrue(ce.getPath().equals("/file2"));
Assert.assertTrue(ce.getCtime() > 0);
@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce.getOverwrite());
// CloseOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce2 = (Event.CloseEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0];
Assert.assertTrue(ce2.getPath().equals("/file2"));
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
// AddOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// CloseOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2"));
// TimesOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue.getPath().equals("/file2"));
Assert.assertTrue(mue.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.TIMES);
// SetReplicationOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue2.getPath().equals("/file2"));
Assert.assertTrue(mue2.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.REPLICATION);
Assert.assertTrue(mue2.getReplication() == 1);
// ConcatDeleteOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(3, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1];
Assert.assertTrue(ue2.getPath().equals("/file3"));
Assert.assertTrue(ue2.getTimestamp() > 0);
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce3 = (Event.CloseEvent) next;
Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE);
Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2];
Assert.assertTrue(ce3.getPath().equals("/file2"));
Assert.assertTrue(ce3.getTimestamp() > 0);
// DeleteOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0];
Assert.assertTrue(ue.getPath().equals("/file2"));
Assert.assertTrue(ue.getTimestamp() > 0);
// MkdirOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce4 = (Event.CreateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce4.getiNodeType() ==
Event.CreateEvent.INodeType.DIRECTORY);
Assert.assertTrue(ce4.getPath().equals("/dir"));
@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce4.getSymlinkTarget() == null);
// SetPermissionsOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue3.getPath().equals("/dir"));
Assert.assertTrue(mue3.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.PERMS);
Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
// SetOwnerOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue4.getPath().equals("/dir"));
Assert.assertTrue(mue4.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.OWNER);
@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue4.getGroupName().equals("groupname"));
// SymlinkOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce5 = (Event.CreateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0];
Assert.assertTrue(ce5.getiNodeType() ==
Event.CreateEvent.INodeType.SYMLINK);
Assert.assertTrue(ce5.getPath().equals("/dir2"));
@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
// SetXAttrOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue5.getPath().equals("/file5"));
Assert.assertTrue(mue5.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(!mue5.isxAttrsRemoved());
// RemoveXAttrOp
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue6.getPath().equals("/file5"));
Assert.assertTrue(mue6.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.XATTRS);
@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(mue6.isxAttrsRemoved());
// SetAclOp (1)
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue7.getPath().equals("/file5"));
Assert.assertTrue(mue7.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream {
AclEntry.parseAclEntry("user::rwx", true)));
// SetAclOp (2)
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
Assert.assertTrue(mue8.getPath().equals("/file5"));
Assert.assertTrue(mue8.getMetadataType() ==
Event.MetadataUpdateEvent.MetadataType.ACLS);
@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream {
// and we should not have been behind at all when eventsBehind was set
// either, since there were few enough events that they should have all
// been read to the client during the first poll() call
Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
} finally {
cluster.shutdown();
@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream {
}
cluster.getDfsCluster().shutdownNameNode(0);
cluster.getDfsCluster().transitionToActive(1);
Event next = null;
EventBatch batch = null;
// we can read all of the edits logged by the old active from the new
// active
for (int i = 0; i < 10; i++) {
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream {
// make sure that the old active can't read any further than the edits
// it logged itself (it has no idea whether the in-progress edits from
// the other writer have actually been committed)
Event next = null;
EventBatch batch = null;
for (int i = 0; i < 10; i++) {
next = waitForNextEvent(eis);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
i));
}
Assert.assertTrue(eis.poll() == null);
@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream {
}, 1, TimeUnit.SECONDS);
// a very generous wait period -- the edit will definitely have been
// processed by the time this is up
Event next = eis.poll(5, TimeUnit.SECONDS);
Assert.assertTrue(next != null);
Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
Assert.assertNotNull(batch);
Assert.assertEquals(1, batch.getEvents().length);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
} finally {
cluster.shutdown();
}
}
}