HDFS-6038. Allow JournalNode to handle editlog produced by new release with future layoutversion. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1579813 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-03-20 23:06:06 +00:00
parent fd6df7675e
commit 9dab514b22
50 changed files with 661 additions and 269 deletions

View File

@ -23,6 +23,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
/** A reusable {@link DataOutput} implementation that writes to an in-memory
* buffer.
*
@ -68,6 +70,18 @@ public void write(DataInput in, int len) throws IOException {
in.readFully(buf, count, len);
count = newcount;
}
/**
* Set the count for the current buf.
* @param newCount the new count to set
* @return the original count
*/
private int setCount(int newCount) {
Preconditions.checkArgument(newCount >= 0 && newCount <= buf.length);
int oldCount = count;
count = newCount;
return oldCount;
}
}
private Buffer buffer;
@ -110,4 +124,21 @@ public void write(DataInput in, int length) throws IOException {
public void writeTo(OutputStream out) throws IOException {
buffer.writeTo(out);
}
/**
* Overwrite an integer into the internal buffer. Note that this call can only
* be used to overwrite existing data in the buffer, i.e., buffer#count cannot
* be increased, and DataOutputStream#written cannot be increased.
*/
public void writeInt(int v, int offset) throws IOException {
Preconditions.checkState(offset + 4 <= buffer.getLength());
byte[] b = new byte[4];
b[0] = (byte) ((v >>> 24) & 0xFF);
b[1] = (byte) ((v >>> 16) & 0xFF);
b[2] = (byte) ((v >>> 8) & 0xFF);
b[3] = (byte) ((v >>> 0) & 0xFF);
int oldCount = buffer.setCount(offset);
buffer.write(b);
buffer.setCount(oldCount);
}
}

View File

@ -972,6 +972,9 @@ BREAKDOWN OF HDFS-5535 ROLLING UPGRADE SUBTASKS AND RELATED JIRAS
DatanodeRegistration with namenode layout version and namenode node type.
(szetszwo)
HDFS-6038. Allow JournalNode to handle editlog produced by new release with
future layoutversion. (jing9)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -97,7 +97,7 @@ public long getLastTxId() {
}
@Override
public int getVersion() throws IOException {
public int getVersion(boolean verifyVersion) throws IOException {
return logVersion;
}

View File

@ -77,7 +77,7 @@ protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
}
@Override
public void create() throws IOException {
public void create(int layoutVersion) throws IOException {
// noop
}

View File

@ -364,7 +364,8 @@ synchronized private void checkEnv() throws IOException {
* @param txId First transaction id to be written to the stream
*/
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
throws IOException {
checkEnv();
if (txId <= maxTxId.get()) {
@ -397,7 +398,7 @@ public EditLogOutputStream startLogSegment(long txId) throws IOException {
try {
String znodePath = inprogressZNode(txId);
EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
HdfsConstants.NAMENODE_LAYOUT_VERSION, currentLedger.getId(), txId);
layoutVersion, currentLedger.getId(), txId);
/* Write the ledger metadata out to the inprogress ledger znode
* This can fail if for some reason our write lock has
* expired (@see WriteLock) and another process has managed to

View File

@ -30,7 +30,6 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Executors;
@ -47,6 +46,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.bookkeeper.proto.BookieServer;
@ -101,7 +101,8 @@ public void testSimpleWrite() throws Exception {
BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -124,7 +125,8 @@ public void testNumberOfTransactions() throws Exception {
BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -147,7 +149,8 @@ public void testNumberOfTransactionsWithGaps() throws Exception {
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
EditLogOutputStream out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
@ -185,7 +188,8 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
EditLogOutputStream out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
@ -198,7 +202,8 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
}
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
EditLogOutputStream out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE/2; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
@ -226,7 +231,8 @@ public void testWriteRestartFrom1() throws Exception {
long txid = 1;
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid);
EditLogOutputStream out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
@ -237,7 +243,8 @@ public void testWriteRestartFrom1() throws Exception {
txid = 1;
try {
out = bkjm.startLogSegment(txid);
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (Exception ioe) {
@ -247,7 +254,8 @@ public void testWriteRestartFrom1() throws Exception {
// test border case
txid = DEFAULT_SEGMENT_SIZE;
try {
out = bkjm.startLogSegment(txid);
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't be able to start another journal from " + txid
+ " when one already exists");
} catch (IOException ioe) {
@ -257,7 +265,8 @@ public void testWriteRestartFrom1() throws Exception {
// open journal continuing from before
txid = DEFAULT_SEGMENT_SIZE + 1;
start = txid;
out = bkjm.startLogSegment(start);
out = bkjm.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertNotNull(out);
for (long j = 1 ; j <= DEFAULT_SEGMENT_SIZE; j++) {
@ -270,7 +279,8 @@ public void testWriteRestartFrom1() throws Exception {
// open journal arbitarily far in the future
txid = DEFAULT_SEGMENT_SIZE * 4;
out = bkjm.startLogSegment(txid);
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertNotNull(out);
}
@ -287,9 +297,11 @@ public void testTwoWriters() throws Exception {
BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
EditLogOutputStream out1 = bkjm1.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
try {
bkjm2.startLogSegment(start);
bkjm2.startLogSegment(start,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
@ -307,7 +319,8 @@ public void testSimpleRead() throws Exception {
bkjm.format(nsi);
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1 ; i <= numTransactions; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -334,7 +347,8 @@ public void testSimpleRecovery() throws Exception {
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -384,7 +398,8 @@ public void testAllBookieFailure() throws Exception {
BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid);
EditLogOutputStream out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@ -416,7 +431,8 @@ public void testAllBookieFailure() throws Exception {
assertEquals("New bookie didn't start",
numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(txid);
out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
@ -471,7 +487,8 @@ public void testOneBookieFailure() throws Exception {
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid);
EditLogOutputStream out = bkjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(txid++);
@ -522,7 +539,8 @@ public void testEmptyInprogressNode() throws Exception {
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -531,7 +549,8 @@ public void testEmptyInprogressNode() throws Exception {
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
out.close();
bkjm.close();
String inprogressZNode = bkjm.inprogressZNode(101);
@ -564,7 +583,8 @@ public void testCorruptInprogressNode() throws Exception {
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -573,7 +593,8 @@ public void testCorruptInprogressNode() throws Exception {
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
out.close();
bkjm.close();
@ -607,7 +628,8 @@ public void testEmptyInprogressLedger() throws Exception {
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -616,13 +638,15 @@ public void testEmptyInprogressLedger() throws Exception {
out.close();
bkjm.finalizeLogSegment(1, 100);
out = bkjm.startLogSegment(101);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
out.close();
bkjm.close();
bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(101);
out = bkjm.startLogSegment(101,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -647,7 +671,8 @@ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
nsi);
bkjm.format(nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
EditLogOutputStream out = bkjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);;
for (long i = 1; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@ -739,7 +764,7 @@ public void testConcurrentFormat() throws Exception {
= new BookKeeperJournalManager(conf, uri, nsi);
bkjm.format(nsi);
for (int i = 1; i < 100*2; i += 2) {
bkjm.startLogSegment(i);
bkjm.startLogSegment(i, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
bkjm.finalizeLogSegment(i, i+1);
}
bkjm.close();
@ -800,7 +825,8 @@ public ThreadStatus call() {
private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
int startTxid, int endTxid) throws IOException, KeeperException,
InterruptedException {
EditLogOutputStream out = bkjm.startLogSegment(startTxid);
EditLogOutputStream out = bkjm.startLogSegment(startTxid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (long i = startTxid; i <= endTxid; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);

View File

@ -67,8 +67,9 @@ public ListenableFuture<Void> sendEdits(
* Begin writing a new log segment.
*
* @param txid the first txid to be written to the new log
* @param layoutVersion the LayoutVersion of the log
*/
public ListenableFuture<Void> startLogSegment(long txid);
public ListenableFuture<Void> startLogSegment(long txid, int layoutVersion);
/**
* Finalize a log segment.

View File

@ -233,10 +233,10 @@ public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
}
public QuorumCall<AsyncLogger, Void> startLogSegment(
long txid) {
long txid, int layoutVersion) {
Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
calls.put(logger, logger.startLogSegment(txid));
calls.put(logger, logger.startLogSegment(txid, layoutVersion));
}
return QuorumCall.create(calls);
}

View File

@ -258,8 +258,7 @@ public URL buildURLToFetchLogs(long segmentTxId) {
private synchronized RequestInfo createReqInfo() {
Preconditions.checkState(epoch > 0, "bad epoch: " + epoch);
return new RequestInfo(journalId, epoch, ipcSerial++,
committedTxId);
return new RequestInfo(journalId, epoch, ipcSerial++, committedTxId);
}
@VisibleForTesting
@ -475,11 +474,12 @@ public Void call() throws Exception {
}
@Override
public ListenableFuture<Void> startLogSegment(final long txid) {
public ListenableFuture<Void> startLogSegment(final long txid,
final int layoutVersion) {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().startLogSegment(createReqInfo(), txid);
getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
synchronized (IPCLoggerChannel.this) {
if (outOfSync) {
outOfSync = false;

View File

@ -394,10 +394,12 @@ private static List<InetSocketAddress> getLoggerAddresses(URI uri)
}
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
throws IOException {
Preconditions.checkState(isActiveWriter,
"must recover segments before starting a new one");
QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
QuorumCall<AsyncLogger, Void> q = loggers.startLogSegment(txId,
layoutVersion);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
"startLogSegment(" + txId + ")");
return new QuorumOutputStream(loggers, txId,

View File

@ -55,7 +55,7 @@ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
}
@Override
public void create() throws IOException {
public void create(int layoutVersion) throws IOException {
throw new UnsupportedOperationException();
}

View File

@ -100,9 +100,10 @@ public void journal(RequestInfo reqInfo,
* using {@link #finalizeLogSegment(RequestInfo, long, long)}.
*
* @param txid the first txid in the new log
* @param layoutVersion the LayoutVersion of the new log
*/
public void startLogSegment(RequestInfo reqInfo,
long txid) throws IOException;
long txid, int layoutVersion) throws IOException;
/**
* Finalize the given log segment on the JournalNode. The segment

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import com.google.protobuf.RpcController;
@ -180,8 +181,10 @@ public HeartbeatResponseProto heartbeat(RpcController controller,
public StartLogSegmentResponseProto startLogSegment(RpcController controller,
StartLogSegmentRequestProto req) throws ServiceException {
try {
impl.startLogSegment(convert(req.getReqInfo()),
req.getTxid());
int layoutVersion = req.hasLayoutVersion() ? req.getLayoutVersion()
: NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
impl.startLogSegment(convert(req.getReqInfo()), req.getTxid(),
layoutVersion);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -194,11 +194,11 @@ private QJournalProtocolProtos.RequestInfoProto convert(
}
@Override
public void startLogSegment(RequestInfo reqInfo, long txid)
public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion)
throws IOException {
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
.setReqInfo(convert(reqInfo))
.setTxid(txid)
.setTxid(txid).setLayoutVersion(layoutVersion)
.build();
try {
rpcProxy.startLogSegment(NULL_CONTROLLER, req);

View File

@ -188,7 +188,7 @@ private synchronized EditLogFile scanStorageForLatestEdits() throws IOException
while (!files.isEmpty()) {
EditLogFile latestLog = files.remove(files.size() - 1);
latestLog.validateLog();
latestLog.scanLog();
LOG.info("Latest log is " + latestLog);
if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
// the log contains no transactions
@ -489,8 +489,8 @@ private void alwaysAssert(boolean expression, String msg,
* Start a new segment at the given txid. The previous segment
* must have already been finalized.
*/
public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
throws IOException {
public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
int layoutVersion) throws IOException {
assert fjm != null;
checkFormatted();
checkRequest(reqInfo);
@ -518,7 +518,7 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
// If it's in-progress, it should only contain one transaction,
// because the "startLogSegment" transaction is written alone at the
// start of each segment.
existing.validateLog();
existing.scanLog();
if (existing.getLastTxId() != existing.getFirstTxId()) {
throw new IllegalStateException("The log file " +
existing + " seems to contain valid transactions");
@ -539,7 +539,7 @@ public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
// remove the record of the older segment here.
purgePaxosDecision(txid);
curSegment = fjm.startLogSegment(txid);
curSegment = fjm.startLogSegment(txid, layoutVersion);
curSegmentTxId = txid;
nextTxId = txid;
}
@ -581,7 +581,7 @@ public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
if (needsValidation) {
LOG.info("Validating log segment " + elf.getFile() + " about to be " +
"finalized");
elf.validateLog();
elf.scanLog();
checkSync(elf.getLastTxId() == endTxId,
"Trying to finalize in-progress log segment %s to end at " +
@ -660,14 +660,15 @@ public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
* @return the current state of the given segment, or null if the
* segment does not exist.
*/
private SegmentStateProto getSegmentInfo(long segmentTxId)
@VisibleForTesting
SegmentStateProto getSegmentInfo(long segmentTxId)
throws IOException {
EditLogFile elf = fjm.getLogFile(segmentTxId);
if (elf == null) {
return null;
}
if (elf.isInProgress()) {
elf.validateLog();
elf.scanLog();
}
if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
LOG.info("Edit log file " + elf + " appears to be empty. " +

View File

@ -156,10 +156,10 @@ public void heartbeat(RequestInfo reqInfo) throws IOException {
}
@Override
public void startLogSegment(RequestInfo reqInfo, long txid)
public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion)
throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
.startLogSegment(reqInfo, txid);
.startLogSegment(reqInfo, txid, layoutVersion);
}
@Override

View File

@ -56,7 +56,8 @@ public boolean hasSomeData() {
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
throws IOException {
EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
journalInfo);
stm.startLogSegment(txId);

View File

@ -92,7 +92,7 @@ protected FSEditLogOp nextValidOp() {
}
@Override
public int getVersion() throws IOException {
public int getVersion(boolean verifyVersion) throws IOException {
return this.version;
}

View File

@ -86,7 +86,7 @@ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
* There is no persistent storage. Just clear the buffers.
*/
@Override // EditLogOutputStream
public void create() throws IOException {
public void create(int layoutVersion) throws IOException {
assert doubleBuf.isFlushed() : "previous data is not flushed yet";
this.doubleBuf = new EditsDoubleBuffer(DEFAULT_BUFFER_SIZE);
}

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
@ -135,7 +136,8 @@ private EditLogFileInputStream(LogSource log,
this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
}
private void init() throws LogHeaderCorruptException, IOException {
private void init(boolean verifyLayoutVersion)
throws LogHeaderCorruptException, IOException {
Preconditions.checkState(state == State.UNINIT);
BufferedInputStream bin = null;
try {
@ -144,12 +146,14 @@ private void init() throws LogHeaderCorruptException, IOException {
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
dataIn = new DataInputStream(tracker);
try {
logVersion = readLogVersion(dataIn);
logVersion = readLogVersion(dataIn, verifyLayoutVersion);
} catch (EOFException eofe) {
throw new LogHeaderCorruptException("No header found in log");
}
// We assume future layout will also support ADD_LAYOUT_FLAGS
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.ADD_LAYOUT_FLAGS, logVersion)) {
LayoutVersion.Feature.ADD_LAYOUT_FLAGS, logVersion) ||
logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
try {
LayoutFlags.read(dataIn);
} catch (EOFException eofe) {
@ -188,7 +192,7 @@ private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
switch (state) {
case UNINIT:
try {
init();
init(true);
} catch (Throwable e) {
LOG.error("caught exception initializing " + this, e);
if (skipBrokenEdits) {
@ -237,6 +241,13 @@ private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
return op;
}
@Override
protected long scanNextOp() throws IOException {
Preconditions.checkState(state == State.OPEN);
FSEditLogOp cachedNext = getCachedOp();
return cachedNext == null ? reader.scanOp() : cachedNext.txid;
}
@Override
protected FSEditLogOp nextOp() throws IOException {
return nextOpImpl(false);
@ -253,9 +264,9 @@ protected FSEditLogOp nextValidOp() {
}
@Override
public int getVersion() throws IOException {
public int getVersion(boolean verifyVersion) throws IOException {
if (state == State.UNINIT) {
init();
init(verifyVersion);
}
return logVersion;
}
@ -293,11 +304,12 @@ public String toString() {
return getName();
}
static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOException {
static FSEditLogLoader.EditLogValidation validateEditLog(File file)
throws IOException {
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
in.getVersion(); // causes us to read the header
in.getVersion(true); // causes us to read the header
} catch (LogHeaderCorruptException e) {
// If the header is malformed or the wrong value, this indicates a corruption
LOG.warn("Log file " + file + " has no valid header", e);
@ -312,6 +324,51 @@ static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOExc
}
}
static FSEditLogLoader.EditLogValidation scanEditLog(File file)
throws IOException {
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
// read the header, initialize the inputstream, but do not check the
// layoutversion
in.getVersion(false);
} catch (LogHeaderCorruptException e) {
LOG.warn("Log file " + file + " has no valid header", e);
return new FSEditLogLoader.EditLogValidation(0,
HdfsConstants.INVALID_TXID, true);
}
long lastPos = 0;
long lastTxId = HdfsConstants.INVALID_TXID;
long numValid = 0;
try {
while (true) {
long txid = HdfsConstants.INVALID_TXID;
lastPos = in.getPosition();
try {
if ((txid = in.scanNextOp()) == HdfsConstants.INVALID_TXID) {
break;
}
} catch (Throwable t) {
FSImage.LOG.warn("Caught exception after scanning through "
+ numValid + " ops from " + in
+ " while determining its valid length. Position was "
+ lastPos, t);
in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue;
}
if (lastTxId == HdfsConstants.INVALID_TXID || txid > lastTxId) {
lastTxId = txid;
}
numValid++;
}
return new EditLogValidation(lastPos, lastTxId, false);
} finally {
IOUtils.closeStream(in);
}
}
/**
* Read the header of fsedit log
* @param in fsedit stream
@ -319,7 +376,7 @@ static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOExc
* @throws IOException if error occurs
*/
@VisibleForTesting
static int readLogVersion(DataInputStream in)
static int readLogVersion(DataInputStream in, boolean verifyLayoutVersion)
throws IOException, LogHeaderCorruptException {
int logVersion;
try {
@ -328,8 +385,9 @@ static int readLogVersion(DataInputStream in)
throw new LogHeaderCorruptException(
"Reached EOF when reading log header");
}
if (logVersion < HdfsConstants.NAMENODE_LAYOUT_VERSION || // future version
logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION) { // unsupported
if (verifyLayoutVersion &&
(logVersion < HdfsConstants.NAMENODE_LAYOUT_VERSION || // future version
logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION)) { // unsupported
throw new LogHeaderCorruptException(
"Unexpected version of the file system log file: "
+ logVersion + ". Current version = "

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
import org.apache.hadoop.io.IOUtils;
@ -115,10 +114,10 @@ public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
* Create empty edits logs file.
*/
@Override
public void create() throws IOException {
public void create(int layoutVersion) throws IOException {
fc.truncate(0);
fc.position(0);
writeHeader(doubleBuf.getCurrentBuf());
writeHeader(layoutVersion, doubleBuf.getCurrentBuf());
setReadyToFlush();
flush();
}
@ -127,12 +126,14 @@ public void create() throws IOException {
* Write header information for this EditLogFileOutputStream to the provided
* DataOutputSream.
*
* @param layoutVersion the LayoutVersion of the EditLog
* @param out the output stream to write the header to.
* @throws IOException in the event of error writing to the stream.
*/
@VisibleForTesting
public static void writeHeader(DataOutputStream out) throws IOException {
out.writeInt(HdfsConstants.NAMENODE_LAYOUT_VERSION);
public static void writeHeader(int layoutVersion, DataOutputStream out)
throws IOException {
out.writeInt(layoutVersion);
LayoutFlags.write(out);
}

View File

@ -19,6 +19,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import java.io.Closeable;
import java.io.IOException;
@ -103,6 +105,15 @@ public void resync() {
* @throws IOException if there is an error reading from the stream
*/
protected abstract FSEditLogOp nextOp() throws IOException;
/**
* Go through the next operation from the stream storage.
* @return the txid of the next operation.
*/
protected long scanNextOp() throws IOException {
FSEditLogOp next = readOp();
return next != null ? next.txid : HdfsConstants.INVALID_TXID;
}
/**
* Get the next valid operation from the stream storage.
@ -147,13 +158,22 @@ public boolean skipUntil(long txid) throws IOException {
}
}
}
/**
* return the cachedOp, and reset it to null.
*/
FSEditLogOp getCachedOp() {
FSEditLogOp op = this.cachedOp;
cachedOp = null;
return op;
}
/**
* Get the layout version of the data in the stream.
* @return the layout version of the ops in the stream.
* @throws IOException if there is an error reading the version
*/
public abstract int getVersion() throws IOException;
public abstract int getVersion(boolean verifyVersion) throws IOException;
/**
* Get the "position" of in the stream. This is useful for

View File

@ -65,9 +65,10 @@ abstract public void writeRaw(byte[] bytes, int offset, int length)
/**
* Create and initialize underlying persistent edits log storage.
*
* @param layoutVersion The LayoutVersion of the journal
* @throws IOException
*/
abstract public void create() throws IOException;
abstract public void create(int layoutVersion) throws IOException;
/**
* Close the journal.

View File

@ -1158,7 +1158,8 @@ private void startLogSegment(final long segmentTxId) throws IOException {
storage.attemptRestoreRemovedStorage();
try {
editLogStream = journalSet.startLogSegment(segmentTxId);
editLogStream = journalSet.startLogSegment(segmentTxId,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
} catch (IOException ex) {
throw new IOException("Unable to start log segment " +
segmentTxId + ": too few journals successfully started.", ex);

View File

@ -182,7 +182,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
}
} catch (Throwable e) {
// Handle a problem with our input
check203UpgradeFailure(in.getVersion(), e);
check203UpgradeFailure(in.getVersion(true), e);
String errorMessage =
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
FSImage.LOG.error(errorMessage, e);
@ -221,7 +221,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
+ ", numEdits=" + numEdits + ", totalEdits=" + totalEdits);
}
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(), lastInodeId);
in.getVersion(true), lastInodeId);
if (lastInodeId < inodeId) {
lastInodeId = inodeId;
}
@ -1024,6 +1024,34 @@ static EditLogValidation validateEditLog(EditLogInputStream in) {
return new EditLogValidation(lastPos, lastTxId, false);
}
static EditLogValidation scanEditLog(EditLogInputStream in) {
long lastPos = 0;
long lastTxId = HdfsConstants.INVALID_TXID;
long numValid = 0;
FSEditLogOp op = null;
while (true) {
lastPos = in.getPosition();
try {
if ((op = in.readOp()) == null) { // TODO
break;
}
} catch (Throwable t) {
FSImage.LOG.warn("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length." +
"Position was " + lastPos, t);
in.resync();
FSImage.LOG.warn("After resync, position is " + in.getPosition());
continue;
}
if (lastTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() > lastTxId) {
lastTxId = op.getTransactionId();
}
numValid++;
}
return new EditLogValidation(lastPos, lastTxId, false);
}
static class EditLogValidation {
private final long validLength;
private final long endTxId;

View File

@ -116,6 +116,7 @@
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@ -206,7 +207,8 @@ private static ImmutableMap<String, FsAction> fsActionMap() {
* Constructor for an EditLog Op. EditLog ops cannot be constructed
* directly, but only through Reader#readOp.
*/
private FSEditLogOp(FSEditLogOpCodes opCode) {
@VisibleForTesting
protected FSEditLogOp(FSEditLogOpCodes opCode) {
this.opCode = opCode;
}
@ -3504,6 +3506,9 @@ static SetAclOp getInstance() {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
AclEditLogProto p = AclEditLogProto.parseDelimitedFrom((DataInputStream)in);
if (p == null) {
throw new IOException("Failed to read fields from SetAclOp");
}
src = p.getSrc();
aclEntries = PBHelper.convertAclEntry(p.getEntriesList());
}
@ -3658,10 +3663,18 @@ public Writer(DataOutputBuffer out) {
*/
public void writeOp(FSEditLogOp op) throws IOException {
int start = buf.getLength();
// write the op code first to make padding and terminator verification
// work
buf.writeByte(op.opCode.getOpCode());
buf.writeInt(0); // write 0 for the length first
buf.writeLong(op.txid);
op.writeFields(buf);
int end = buf.getLength();
// write the length back: content of the op + 4 bytes checksum - op_code
int length = end - start - 1;
buf.writeInt(length, start + 1);
checksum.reset();
checksum.update(buf.getData(), start, end-start);
int sum = (int)checksum.getValue();
@ -3679,6 +3692,7 @@ public static class Reader {
private final Checksum checksum;
private final OpInstanceCache cache;
private int maxOpSize;
private final boolean supportEditLogLength;
/**
* Construct the reader
@ -3693,6 +3707,12 @@ public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
} else {
this.checksum = null;
}
// It is possible that the logVersion is actually a future layoutversion
// during the rolling upgrade (e.g., the NN gets upgraded first). We
// assume future layout will also support length of editlog op.
this.supportEditLogLength = NameNodeLayoutVersion.supports(
NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)
|| logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION;
if (this.checksum != null) {
this.in = new DataInputStream(
@ -3827,6 +3847,10 @@ private FSEditLogOp decodeOp() throws IOException {
throw new IOException("Read invalid opcode " + opCode);
}
if (supportEditLogLength) {
in.readInt();
}
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
// Read the txid
@ -3841,6 +3865,42 @@ private FSEditLogOp decodeOp() throws IOException {
return op;
}
/**
* Similar with decodeOp(), but instead of doing the real decoding, we skip
* the content of the op if the length of the editlog is supported.
* @return the last txid of the segment, or INVALID_TXID on exception
*/
public long scanOp() throws IOException {
if (supportEditLogLength) {
limiter.setLimit(maxOpSize);
in.mark(maxOpSize);
final byte opCodeByte;
try {
opCodeByte = in.readByte(); // op code
} catch (EOFException e) {
return HdfsConstants.INVALID_TXID;
}
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID) {
verifyTerminator();
return HdfsConstants.INVALID_TXID;
}
int length = in.readInt(); // read the length of the op
long txid = in.readLong(); // read the txid
// skip the remaining content
IOUtils.skipFully(in, length - 8);
// TODO: do we want to verify checksum for JN? For now we don't.
return txid;
} else {
FSEditLogOp op = decodeOp();
return op == null ? HdfsConstants.INVALID_TXID : op.getTransactionId();
}
}
/**
* Validate a transaction's checksum
*/

View File

@ -103,13 +103,13 @@ public boolean hasSomeData() {
}
@Override
synchronized public EditLogOutputStream startLogSegment(long txid)
throws IOException {
synchronized public EditLogOutputStream startLogSegment(long txid,
int layoutVersion) throws IOException {
try {
currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
EditLogOutputStream stm = new EditLogFileOutputStream(conf,
currentInProgress, outputBufferCapacity);
stm.create();
stm.create(layoutVersion);
return stm;
} catch (IOException e) {
LOG.warn("Unable to start log segment " + txid +
@ -476,6 +476,12 @@ public void validateLog() throws IOException {
this.hasCorruptHeader = val.hasCorruptHeader();
}
public void scanLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.scanEditLog(file);
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
public boolean isInProgress() {
return isInProgress;
}

View File

@ -49,7 +49,8 @@ public interface JournalManager extends Closeable, FormatConfirmable,
* Begin writing to a new segment of the log stream, which starts at
* the given transaction ID.
*/
EditLogOutputStream startLogSegment(long txId) throws IOException;
EditLogOutputStream startLogSegment(long txId, int layoutVersion)
throws IOException;
/**
* Mark the log segment that spans from firstTxId to lastTxId

View File

@ -89,10 +89,10 @@ public JournalAndStream(JournalManager manager, boolean required,
this.shared = shared;
}
public void startLogSegment(long txId) throws IOException {
public void startLogSegment(long txId, int layoutVersion) throws IOException {
Preconditions.checkState(stream == null);
disabled = false;
stream = journal.startLogSegment(txId);
stream = journal.startLogSegment(txId, layoutVersion);
}
/**
@ -200,11 +200,12 @@ public boolean hasSomeData() throws IOException {
@Override
public EditLogOutputStream startLogSegment(final long txId) throws IOException {
public EditLogOutputStream startLogSegment(final long txId,
final int layoutVersion) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.startLogSegment(txId);
jas.startLogSegment(txId, layoutVersion);
}
}, "starting log segment " + txId);
return new JournalSetOutputStream();
@ -433,12 +434,12 @@ public void apply(JournalAndStream jas) throws IOException {
}
@Override
public void create() throws IOException {
public void create(final int layoutVersion) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().create();
jas.getCurrentStream().create(layoutVersion);
}
}
}, "create");

View File

@ -63,7 +63,8 @@ public static boolean supports(final LayoutFeature f, final int lv) {
* </ul>
*/
public static enum Feature implements LayoutFeature {
ROLLING_UPGRADE(-55, -53, "Support rolling upgrade", false);
ROLLING_UPGRADE(-55, -53, "Support rolling upgrade", false),
EDITLOG_LENGTH(-56, "Add length field to every edit log op");
private final FeatureInfo info;

View File

@ -247,8 +247,8 @@ protected FSEditLogOp nextOp() throws IOException {
}
@Override
public int getVersion() throws IOException {
return streams[curIdx].getVersion();
public int getVersion(boolean verifyVersion) throws IOException {
return streams[curIdx].getVersion(verifyVersion);
}
@Override

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
/**
* BinaryEditsVisitor implements a binary EditsVisitor
@ -42,7 +43,7 @@ public class BinaryEditsVisitor implements OfflineEditsVisitor {
public BinaryEditsVisitor(String outputName) throws IOException {
this.elfos = new EditLogFileOutputStream(new Configuration(),
new File(outputName), 0);
elfos.create();
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
}
/**

View File

@ -61,7 +61,7 @@ public OfflineEditsBinaryLoader(OfflineEditsVisitor visitor,
@Override
public void loadEdits() throws IOException {
try {
visitor.start(inputStream.getVersion());
visitor.start(inputStream.getVersion(true));
while (true) {
try {
FSEditLogOp op = inputStream.readOp();

View File

@ -94,6 +94,7 @@ message HeartbeatResponseProto { // void response
message StartLogSegmentRequestProto {
required RequestInfoProto reqInfo = 1;
required uint64 txid = 2; // Transaction ID
optional sint32 layoutVersion = 3; // the LayoutVersion in the client
}
message StartLogSegmentResponseProto {

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.TestEditLog;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
@ -59,11 +61,28 @@ public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
return Arrays.copyOf(buf.getData(), buf.getLength());
}
/**
* Generate byte array representing a set of GarbageMkdirOp
*/
public static byte[] createGabageTxns(long startTxId, int numTxns)
throws IOException {
DataOutputBuffer buf = new DataOutputBuffer();
FSEditLogOp.Writer writer = new FSEditLogOp.Writer(buf);
for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
FSEditLogOp op = new TestEditLog.GarbageMkdirOp();
op.setTransactionId(txid);
writer.writeOp(op);
}
return Arrays.copyOf(buf.getData(), buf.getLength());
}
public static EditLogOutputStream writeSegment(MiniJournalCluster cluster,
QuorumJournalManager qjm, long startTxId, int numTxns,
boolean finalize) throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(startTxId);
EditLogOutputStream stm = qjm.startLogSegment(startTxId,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Should create in-progress
assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(startTxId));

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
@ -172,7 +173,7 @@ public void testStopSendingEditsWhenOutOfSync() throws Exception {
Mockito.<RequestInfo>any());
// After a roll, sending new edits should not fail.
ch.startLogSegment(3L).get();
ch.startLogSegment(3L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
assertFalse(ch.isOutOfSync());
ch.sendEdits(3L, 3L, 1, FAKE_DATA).get();

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.io.IOUtils;
@ -287,7 +288,8 @@ private long writeSegmentUntilCrash(MiniJournalCluster cluster,
long firstTxId = txid;
long lastAcked = txid - 1;
try {
EditLogOutputStream stm = qjm.startLogSegment(txid);
EditLogOutputStream stm = qjm.startLogSegment(txid,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
for (int i = 0; i < numTxns; i++) {
QJMTestUtil.writeTxns(stm, txid++, 1);

View File

@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.*;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.File;
@ -49,6 +53,7 @@
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -259,7 +264,8 @@ public void testCrashAtBeginningOfSegment() throws Exception {
writeSegment(cluster, qjm, 1, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
EditLogOutputStream stm = qjm.startLogSegment(4);
EditLogOutputStream stm = qjm.startLogSegment(4,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
try {
waitForAllPendingCalls(qjm.getLoggerSetForTests());
} finally {
@ -306,7 +312,8 @@ public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn)
cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0);
// Open segment on 2/3 nodes
EditLogOutputStream stm = qjm.startLogSegment(4);
EditLogOutputStream stm = qjm.startLogSegment(4,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
try {
waitForAllPendingCalls(qjm.getLoggerSetForTests());
@ -456,13 +463,15 @@ public void testMissFinalizeAndNextStart() throws Exception {
futureThrows(new IOException("injected")).when(spies.get(0))
.finalizeLogSegment(Mockito.eq(1L), Mockito.eq(3L));
futureThrows(new IOException("injected")).when(spies.get(0))
.startLogSegment(Mockito.eq(4L));
.startLogSegment(Mockito.eq(4L),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
// Logger 1: fail at txn id 4
failLoggerAtTxn(spies.get(1), 4L);
writeSegment(cluster, qjm, 1, 3, true);
EditLogOutputStream stm = qjm.startLogSegment(4);
EditLogOutputStream stm = qjm.startLogSegment(4,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
try {
writeTxns(stm, 4, 1);
fail("Did not fail to write");
@ -544,7 +553,8 @@ public void testRecoverAfterIncompleteRecovery() throws Exception {
* None of the loggers have any associated paxos info.
*/
private void setupLoggers345() throws Exception {
EditLogOutputStream stm = qjm.startLogSegment(1);
EditLogOutputStream stm = qjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
failLoggerAtTxn(spies.get(0), 4);
failLoggerAtTxn(spies.get(1), 5);

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@ -112,30 +113,39 @@ static Stubber futureThrows(Throwable t) {
@Test
public void testAllLoggersStartOk() throws Exception {
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
qjm.startLogSegment(1);
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
}
@Test
public void testQuorumOfLoggersStartOk() throws Exception {
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureThrows(new IOException("logger failed"))
.when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
qjm.startLogSegment(1);
.when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
}
@Test
public void testQuorumOfLoggersFail() throws Exception {
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureThrows(new IOException("logger failed"))
.when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
.when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureThrows(new IOException("logger failed"))
.when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
.when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
try {
qjm.startLogSegment(1);
qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Did not throw when quorum failed");
} catch (QuorumException qe) {
GenericTestUtils.assertExceptionContains("logger failed", qe);
@ -144,10 +154,14 @@ public void testQuorumOfLoggersFail() throws Exception {
@Test
public void testQuorumOutputStreamReport() throws Exception {
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
QuorumOutputStream os = (QuorumOutputStream) qjm.startLogSegment(1);
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
QuorumOutputStream os = (QuorumOutputStream) qjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
String report = os.generateReport();
Assert.assertFalse("Report should be plain text", report.contains("<"));
}
@ -203,10 +217,14 @@ public void testWriteEditsOneSlow() throws Exception {
}
private EditLogOutputStream createLogSegment() throws IOException {
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
EditLogOutputStream stm = qjm.startLogSegment(1);
futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong(),
Mockito.eq(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION));
EditLogOutputStream stm = qjm.startLogSegment(1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
return stm;
}
}

View File

@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@ -26,18 +29,23 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestJournal {
@ -77,7 +85,36 @@ public void verifyNoStorageErrors() throws Exception{
public void cleanup() {
IOUtils.closeStream(journal);
}
/**
* Test whether JNs can correctly handle editlog that cannot be decoded.
*/
@Test
public void testScanEditLog() throws Exception {
// use a future layout version
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1);
// in the segment we write garbage editlog, which can be scanned but
// cannot be decoded
final int numTxns = 5;
byte[] ops = QJMTestUtil.createGabageTxns(1, 5);
journal.journal(makeRI(2), 1, 1, numTxns, ops);
// verify the in-progress editlog segment
SegmentStateProto segmentState = journal.getSegmentInfo(1);
assertTrue(segmentState.getIsInProgress());
Assert.assertEquals(numTxns, segmentState.getEndTxId());
Assert.assertEquals(1, segmentState.getStartTxId());
// finalize the segment and verify it again
journal.finalizeLogSegment(makeRI(3), 1, numTxns);
segmentState = journal.getSegmentInfo(1);
assertFalse(segmentState.getIsInProgress());
Assert.assertEquals(numTxns, segmentState.getEndTxId());
Assert.assertEquals(1, segmentState.getStartTxId());
}
@Test (timeout = 10000)
public void testEpochHandling() throws Exception {
assertEquals(0, journal.getLastPromisedEpoch());
@ -96,7 +133,8 @@ public void testEpochHandling() throws Exception {
"Proposed epoch 3 <= last promise 3", ioe);
}
try {
journal.startLogSegment(makeRI(1), 12345L);
journal.startLogSegment(makeRI(1), 12345L,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Should have rejected call from prior epoch");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
@ -114,7 +152,8 @@ public void testEpochHandling() throws Exception {
@Test (timeout = 10000)
public void testMaintainCommittedTxId() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Send txids 1-3, with a request indicating only 0 committed
journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3));
@ -129,7 +168,8 @@ public void testMaintainCommittedTxId() throws Exception {
@Test (timeout = 10000)
public void testRestartJournal() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(2), 1, 1, 2,
QJMTestUtil.createTxnData(1, 2));
// Don't finalize.
@ -153,7 +193,8 @@ public void testRestartJournal() throws Exception {
@Test (timeout = 10000)
public void testFormatResetsCachedValues() throws Exception {
journal.newEpoch(FAKE_NSINFO, 12345L);
journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L);
journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertEquals(12345L, journal.getLastPromisedEpoch());
assertEquals(12345L, journal.getLastWriterEpoch());
@ -176,11 +217,13 @@ public void testFormatResetsCachedValues() throws Exception {
@Test (timeout = 10000)
public void testNewEpochAtBeginningOfSegment() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(2), 1, 1, 2,
QJMTestUtil.createTxnData(1, 2));
journal.finalizeLogSegment(makeRI(3), 1, 2);
journal.startLogSegment(makeRI(4), 3);
journal.startLogSegment(makeRI(4), 3,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
assertEquals(1, resp.getLastSegmentTxId());
}
@ -219,7 +262,8 @@ public void testJournalLocking() throws Exception {
@Test (timeout = 10000)
public void testFinalizeWhenEditsAreMissed() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
journal.startLogSegment(makeRI(1), 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(2), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3));
@ -276,7 +320,8 @@ public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
journal.newEpoch(FAKE_NSINFO, 1);
// Start a segment at txid 1, and write a batch of 3 txns.
journal.startLogSegment(makeRI(1), 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(2), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3));
@ -285,7 +330,8 @@ public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
// Try to start new segment at txid 6, this should abort old segment and
// then succeed, allowing us to write txid 6-9.
journal.startLogSegment(makeRI(3), 6);
journal.startLogSegment(makeRI(3), 6,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(4), 6, 6, 3,
QJMTestUtil.createTxnData(6, 3));
@ -306,14 +352,16 @@ public void testStartLogSegmentWhenAlreadyExists() throws Exception {
// Start a segment at txid 1, and write just 1 transaction. This
// would normally be the START_LOG_SEGMENT transaction.
journal.startLogSegment(makeRI(1), 1);
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(2), 1, 1, 1,
QJMTestUtil.createTxnData(1, 1));
// Try to start new segment at txid 1, this should succeed, because
// we are allowed to re-start a segment if we only ever had the
// START_LOG_SEGMENT transaction logged.
journal.startLogSegment(makeRI(3), 1);
journal.startLogSegment(makeRI(3), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
journal.journal(makeRI(4), 1, 1, 1,
QJMTestUtil.createTxnData(1, 1));
@ -323,7 +371,8 @@ public void testStartLogSegmentWhenAlreadyExists() throws Exception {
QJMTestUtil.createTxnData(2, 3));
try {
journal.startLogSegment(makeRI(6), 1);
journal.startLogSegment(makeRI(6), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Did not fail to start log segment which would overwrite " +
"an existing one");
} catch (IllegalStateException ise) {
@ -335,7 +384,8 @@ public void testStartLogSegmentWhenAlreadyExists() throws Exception {
// Ensure that we cannot overwrite a finalized segment
try {
journal.startLogSegment(makeRI(8), 1);
journal.startLogSegment(makeRI(8), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
fail("Did not fail to start log segment which would overwrite " +
"an existing one");
} catch (IllegalStateException ise) {

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -111,7 +112,7 @@ public void testJournal() throws Exception {
conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
metrics = MetricsAsserts.getMetrics(
@ -136,7 +137,7 @@ public void testJournal() throws Exception {
public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get();
// Switch to a new epoch without closing earlier segment
@ -152,7 +153,7 @@ public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
assertEquals(1, response.getLastSegmentTxId());
// Start a segment but don't write anything, check newEpoch segment info
ch.startLogSegment(3).get();
ch.startLogSegment(3, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
response = ch.newEpoch(4).get();
ch.setEpoch(4);
// Because the new segment is empty, it is equivalent to not having
@ -181,7 +182,7 @@ public void testHttpServer() throws Exception {
conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
ch.sendEdits(1L, 1, 3, EDITS_DATA).get();
ch.finalizeLogSegment(1, 3).get();
@ -233,7 +234,7 @@ public void testAcceptRecoveryBehavior() throws Exception {
// Make a log segment, and prepare again -- this time should see the
// segment existing.
ch.startLogSegment(1L).get();
ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
prep = ch.prepareRecovery(1L).get();
@ -322,7 +323,7 @@ private void doPerfTest(int editsSize, int numEdits) throws Exception {
byte[] data = new byte[editsSize];
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1).get();
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
Stopwatch sw = new Stopwatch().start();
for (int i = 1; i < numEdits; i++) {

View File

@ -67,7 +67,7 @@ public void setUp() throws IOException {
// Return a a good software version.
doReturn(VersionInfo.getVersion()).when(fakeNsInfo).getSoftwareVersion();
// Return a good layout version for now.
doReturn(HdfsConstants.DATANODE_LAYOUT_VERSION).when(fakeNsInfo)
doReturn(HdfsConstants.NAMENODE_LAYOUT_VERSION).when(fakeNsInfo)
.getLayoutVersion();
DatanodeProtocolClientSideTranslatorPB fakeDnProt =

View File

@ -27,6 +27,7 @@
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@ -68,6 +69,8 @@
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
@ -76,6 +79,8 @@
import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -88,7 +93,42 @@ public class TestEditLog {
static {
((Log4JLogger)FSEditLog.LOG).getLogger().setLevel(Level.ALL);
}
/**
* A garbage mkdir op which is used for testing
* {@link EditLogFileInputStream#scanEditLog(File)}
*/
public static class GarbageMkdirOp extends FSEditLogOp {
public GarbageMkdirOp() {
super(FSEditLogOpCodes.OP_MKDIR);
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
throw new IOException("cannot decode GarbageMkdirOp");
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
// write in some garbage content
Random random = new Random();
byte[] content = new byte[random.nextInt(16) + 1];
random.nextBytes(content);
out.write(content);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
throw new UnsupportedOperationException(
"Not supported for GarbageMkdirOp");
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
throw new UnsupportedOperationException(
"Not supported for GarbageMkdirOp");
}
}
static final Log LOG = LogFactory.getLog(TestEditLog.class);
static final int NUM_DATA_NODES = 0;
@ -767,7 +807,7 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs,
EditLogFileOutputStream stream = new EditLogFileOutputStream(conf, log, 1024);
try {
stream.create();
stream.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
if (!inBothDirs) {
break;
}
@ -820,7 +860,7 @@ public EditLogByteInputStream(byte[] data) throws IOException {
BufferedInputStream bin = new BufferedInputStream(input);
DataInputStream in = new DataInputStream(bin);
version = EditLogFileInputStream.readLogVersion(in);
version = EditLogFileInputStream.readLogVersion(in, true);
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
@ -853,7 +893,7 @@ protected FSEditLogOp nextOp() throws IOException {
}
@Override
public int getVersion() throws IOException {
public int getVersion(boolean verifyVersion) throws IOException {
return version;
}
@ -1237,7 +1277,7 @@ static void validateNoCrash(byte garbage[]) throws IOException {
EditLogFileInputStream elfis = null;
try {
elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
elfos.create();
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
elfos.writeRaw(garbage, 0, garbage.length);
elfos.setReadyToFlush();
elfos.flushAndSync(true);

View File

@ -81,7 +81,7 @@ public void testRawWrites() throws IOException {
TEST_EDITS, 0);
try {
byte[] small = new byte[] { 1, 2, 3, 4, 5, 8, 7 };
elos.create();
elos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// The first (small) write we make extends the file by 1 MB due to
// preallocation.
elos.writeRaw(small, 0, small.length);

View File

@ -22,7 +22,6 @@
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FilenameFilter;
@ -43,7 +42,6 @@
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
@ -223,7 +221,7 @@ public void testInprogressRecoveryAll() throws IOException {
*/
private void corruptAfterStartSegment(File f) throws IOException {
RandomAccessFile raf = new RandomAccessFile(f, "rw");
raf.seek(0x16); // skip version and first tranaction and a bit of next transaction
raf.seek(0x20); // skip version and first tranaction and a bit of next transaction
for (int i = 0; i < 1000; i++) {
raf.writeInt(0xdeadbeef);
}

View File

@ -160,7 +160,8 @@ public void format(NamespaceInfo nsInfo) throws IOException {
}
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
throws IOException {
return mock(EditLogOutputStream.class);
}

View File

@ -73,7 +73,7 @@ static void runEditLogTest(EditLogTestSetup elts) throws IOException {
EditLogFileInputStream elfis = null;
try {
elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
elfos.create();
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
elts.addTransactionsToLog(elfos, cache);
elfos.setReadyToFlush();
@ -274,7 +274,7 @@ public Set<Long> getValidTxIds() {
}
public int getMaxOpSize() {
return 36;
return 40;
}
}

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
@ -444,7 +445,8 @@ private static void createEmptyInProgressEditLog(MiniDFSCluster cluster,
if (writeHeader) {
DataOutputStream out = new DataOutputStream(new FileOutputStream(
inProgressFile));
EditLogFileOutputStream.writeHeader(out);
EditLogFileOutputStream.writeHeader(
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION, out);
out.close();
}
}

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<EDITS>
<EDITS_VERSION>-55</EDITS_VERSION>
<EDITS_VERSION>-56</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1393648283650</EXPIRY_DATE>
<KEY>76e6d2854a753680</KEY>
<EXPIRY_DATE>1394849922137</EXPIRY_DATE>
<KEY>37e1a64049bbef35</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1393648283653</EXPIRY_DATE>
<KEY>939fb7b875c956cd</KEY>
<EXPIRY_DATE>1394849922140</EXPIRY_DATE>
<KEY>7c0bf5039242fc54</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -37,18 +37,18 @@
<INODEID>16386</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084379</MTIME>
<ATIME>1392957084379</ATIME>
<MTIME>1394158722811</MTIME>
<ATIME>1394158722811</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1178237747_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>7</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>6</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -59,13 +59,13 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084397</MTIME>
<ATIME>1392957084379</ATIME>
<MTIME>1394158722832</MTIME>
<ATIME>1394158722811</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -78,9 +78,9 @@
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1392957084400</TIMESTAMP>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID>
<TIMESTAMP>1394158722836</TIMESTAMP>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>8</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -89,9 +89,9 @@
<TXID>7</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
<TIMESTAMP>1392957084413</TIMESTAMP>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID>
<TIMESTAMP>1394158722842</TIMESTAMP>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -101,9 +101,9 @@
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
<TIMESTAMP>1392957084419</TIMESTAMP>
<TIMESTAMP>1394158722848</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@ -136,8 +136,8 @@
<TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>15</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>14</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -147,8 +147,8 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>15</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -157,8 +157,8 @@
<TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -169,18 +169,18 @@
<INODEID>16388</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084440</MTIME>
<ATIME>1392957084440</ATIME>
<MTIME>1394158722872</MTIME>
<ATIME>1394158722872</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1178237747_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -191,13 +191,13 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084441</MTIME>
<ATIME>1392957084440</ATIME>
<MTIME>1394158722874</MTIME>
<ATIME>1394158722872</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -253,10 +253,10 @@
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1392957084455</TIMESTAMP>
<TIMESTAMP>1394158722890</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>25</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>24</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -267,18 +267,18 @@
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084459</MTIME>
<ATIME>1392957084459</ATIME>
<MTIME>1394158722895</MTIME>
<ATIME>1394158722895</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1178237747_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>27</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>26</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -383,8 +383,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084525</MTIME>
<ATIME>1392957084459</ATIME>
<MTIME>1394158722986</MTIME>
<ATIME>1394158722895</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -404,7 +404,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -418,18 +418,18 @@
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084527</MTIME>
<ATIME>1392957084527</ATIME>
<MTIME>1394158722989</MTIME>
<ATIME>1394158722989</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1178237747_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>40</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>39</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -534,8 +534,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084542</MTIME>
<ATIME>1392957084527</ATIME>
<MTIME>1394158723010</MTIME>
<ATIME>1394158722989</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -555,7 +555,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -569,18 +569,18 @@
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084544</MTIME>
<ATIME>1392957084544</ATIME>
<MTIME>1394158723012</MTIME>
<ATIME>1394158723012</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1178237747_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>52</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>51</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -685,8 +685,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084559</MTIME>
<ATIME>1392957084544</ATIME>
<MTIME>1394158723035</MTIME>
<ATIME>1394158723012</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -706,7 +706,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -718,13 +718,13 @@
<TXID>56</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
<TIMESTAMP>1392957084561</TIMESTAMP>
<TIMESTAMP>1394158723039</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>63</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>62</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -735,15 +735,15 @@
<INODEID>16392</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
<MTIME>1392957084564</MTIME>
<ATIME>1392957084564</ATIME>
<MTIME>1394158723044</MTIME>
<ATIME>1394158723044</ATIME>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>63</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -754,18 +754,18 @@
<INODEID>16393</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957084567</MTIME>
<ATIME>1392957084567</ATIME>
<MTIME>1394158723047</MTIME>
<ATIME>1394158723047</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1178237747_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>65</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -821,7 +821,7 @@
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>64</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-1178237747_1</LEASEHOLDER>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_221786725_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@ -834,8 +834,8 @@
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1392957087263</MTIME>
<ATIME>1392957084567</ATIME>
<MTIME>1394158725708</MTIME>
<ATIME>1394158723047</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -845,7 +845,7 @@
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>szetszwo</USERNAME>
<USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -856,13 +856,13 @@
<DATA>
<TXID>66</TXID>
<POOLNAME>pool1</POOLNAME>
<OWNERNAME>szetszwo</OWNERNAME>
<OWNERNAME>jing</OWNERNAME>
<GROUPNAME>staff</GROUPNAME>
<MODE>493</MODE>
<LIMIT>9223372036854775807</LIMIT>
<MAXRELATIVEEXPIRY>2305843009213693951</MAXRELATIVEEXPIRY>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>72</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>71</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -871,8 +871,8 @@
<TXID>67</TXID>
<POOLNAME>pool1</POOLNAME>
<LIMIT>99</LIMIT>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>72</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -883,9 +883,9 @@
<PATH>/path</PATH>
<REPLICATION>1</REPLICATION>
<POOL>pool1</POOL>
<EXPIRATION>2305844402170781554</EXPIRATION>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID>
<EXPIRATION>2305844403372420029</EXPIRATION>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -894,8 +894,8 @@
<TXID>69</TXID>
<ID>1</ID>
<REPLICATION>2</REPLICATION>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>75</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -903,8 +903,8 @@
<DATA>
<TXID>70</TXID>
<ID>1</ID>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>76</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>75</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -912,8 +912,8 @@
<DATA>
<TXID>71</TXID>
<POOLNAME>pool1</POOLNAME>
<RPC_CLIENTID>ad7d1b9e-e5d3-4d8d-ae1a-060f579be11e</RPC_CLIENTID>
<RPC_CALLID>77</RPC_CALLID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>76</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -927,14 +927,14 @@
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<DATA>
<TXID>73</TXID>
<STARTTIME>1392957087621</STARTTIME>
<STARTTIME>1394158726098</STARTTIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_FINALIZE</OPCODE>
<DATA>
<TXID>74</TXID>
<FINALIZETIME>1392957087621</FINALIZETIME>
<FINALIZETIME>1394158726098</FINALIZETIME>
</DATA>
</RECORD>
<RECORD>