HDFS-8846. Add a unit test for INotify functionality across a layout version upgrade (Zhe Zhang via Colin P. McCabe)
(cherry picked from commita4d9acc51d
) (cherry picked from commit9264b7e119
)
This commit is contained in:
parent
2e86d1f254
commit
a976acc02d
|
@ -163,6 +163,9 @@ Release 2.6.1 - UNRELEASED
|
||||||
HDFS-7446. HDFS inotify should have the ability to determine what txid it
|
HDFS-7446. HDFS inotify should have the ability to determine what txid it
|
||||||
has read up to (cmccabe)
|
has read up to (cmccabe)
|
||||||
|
|
||||||
|
HDFS-8846. Add a unit test for INotify functionality across a layout
|
||||||
|
version upgrade (Zhe Zhang via Colin P. McCabe)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class TestDFSInotifyEventInputStream {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
TestDFSInotifyEventInputStream.class);
|
TestDFSInotifyEventInputStream.class);
|
||||||
|
|
||||||
private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
|
public static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
|
||||||
throws IOException, MissingEventsException {
|
throws IOException, MissingEventsException {
|
||||||
EventBatch batch = null;
|
EventBatch batch = null;
|
||||||
while ((batch = eis.poll()) == null);
|
while ((batch = eis.poll()) == null);
|
||||||
|
|
|
@ -28,21 +28,14 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.regex.Matcher;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.inotify.Event;
|
|
||||||
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
|
@ -50,11 +43,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
|
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
@ -62,8 +50,6 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
|
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
|
@ -466,67 +452,6 @@ public class TestDFSUpgrade {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testPreserveEditLogs() throws Exception {
|
|
||||||
conf = new HdfsConfiguration();
|
|
||||||
conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
|
|
||||||
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
|
|
||||||
|
|
||||||
log("Normal NameNode upgrade", 1);
|
|
||||||
File[] created =
|
|
||||||
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
|
|
||||||
for (final File createdDir : created) {
|
|
||||||
String[] fileNameList = createdDir.list(EditLogsFilter.INSTANCE);
|
|
||||||
for (String fileName : fileNameList) {
|
|
||||||
String tmpFileName = fileName + ".tmp";
|
|
||||||
File existingFile = new File(createdDir, fileName);
|
|
||||||
File tmpFile = new File(createdDir, tmpFileName);
|
|
||||||
Files.move(existingFile.toPath(), tmpFile.toPath());
|
|
||||||
File newFile = new File(createdDir, fileName);
|
|
||||||
Preconditions.checkState(newFile.createNewFile(),
|
|
||||||
"Cannot create new edits log file in " + createdDir);
|
|
||||||
EditLogFileInputStream in = new EditLogFileInputStream(tmpFile,
|
|
||||||
HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
|
|
||||||
false);
|
|
||||||
EditLogFileOutputStream out = new EditLogFileOutputStream(conf, newFile,
|
|
||||||
(int)tmpFile.length());
|
|
||||||
out.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1);
|
|
||||||
FSEditLogOp logOp = in.readOp();
|
|
||||||
while (logOp != null) {
|
|
||||||
out.write(logOp);
|
|
||||||
logOp = in.readOp();
|
|
||||||
}
|
|
||||||
out.setReadyToFlush();
|
|
||||||
out.flushAndSync(true);
|
|
||||||
out.close();
|
|
||||||
Files.delete(tmpFile.toPath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cluster = createCluster();
|
|
||||||
|
|
||||||
DFSInotifyEventInputStream ieis =
|
|
||||||
cluster.getFileSystem().getInotifyEventStream(0);
|
|
||||||
EventBatch batch = ieis.poll();
|
|
||||||
Event[] events = batch.getEvents();
|
|
||||||
assertTrue("Should be able to get transactions before the upgrade.",
|
|
||||||
events.length > 0);
|
|
||||||
assertEquals(events[0].getEventType(), Event.EventType.CREATE);
|
|
||||||
assertEquals(((CreateEvent) events[0]).getPath(), "/TestUpgrade");
|
|
||||||
cluster.shutdown();
|
|
||||||
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
|
|
||||||
}
|
|
||||||
|
|
||||||
private enum EditLogsFilter implements FilenameFilter {
|
|
||||||
INSTANCE;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean accept(File dir, String name) {
|
|
||||||
return name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestDFSUpgrade t = new TestDFSUpgrade();
|
TestDFSUpgrade t = new TestDFSUpgrade();
|
||||||
TestDFSUpgrade.initialize();
|
TestDFSUpgrade.initialize();
|
||||||
|
|
|
@ -18,10 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
@ -40,10 +36,13 @@ import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.inotify.Event;
|
||||||
|
import org.apache.hadoop.hdfs.inotify.EventBatch;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
@ -51,6 +50,9 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests data transfer protocol handling in the Datanode. It sends
|
* This tests data transfer protocol handling in the Datanode. It sends
|
||||||
* various forms of wrong data and verifies that Datanode handles it well.
|
* various forms of wrong data and verifies that Datanode handles it well.
|
||||||
|
@ -74,6 +76,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
private static final String HADOOP023_RESERVED_IMAGE =
|
private static final String HADOOP023_RESERVED_IMAGE =
|
||||||
"hadoop-0.23-reserved.tgz";
|
"hadoop-0.23-reserved.tgz";
|
||||||
private static final String HADOOP2_RESERVED_IMAGE = "hadoop-2-reserved.tgz";
|
private static final String HADOOP2_RESERVED_IMAGE = "hadoop-2-reserved.tgz";
|
||||||
|
private static final String HADOOP252_IMAGE = "hadoop-252-dfs-dir.tgz";
|
||||||
|
|
||||||
private static class ReferenceFileInfo {
|
private static class ReferenceFileInfo {
|
||||||
String path;
|
String path;
|
||||||
|
@ -620,4 +623,100 @@ public class TestDFSUpgradeFromImage {
|
||||||
numDataNodes(1).enableManagedDfsDirsRedundancy(false).
|
numDataNodes(1).enableManagedDfsDirsRedundancy(false).
|
||||||
manageDataDfsDirs(false), null);
|
manageDataDfsDirs(false), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreserveEditLogs() throws Exception {
|
||||||
|
unpackStorage(HADOOP252_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
|
/**
|
||||||
|
* The pre-created image has the following edits:
|
||||||
|
* mkdir /input; mkdir /input/dir1~5
|
||||||
|
* copyFromLocal randome_file_1 /input/dir1
|
||||||
|
* copyFromLocal randome_file_2 /input/dir2
|
||||||
|
* mv /input/dir1/randome_file_1 /input/dir3/randome_file_3
|
||||||
|
* rmdir /input/dir1
|
||||||
|
*/
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
||||||
|
.format(false)
|
||||||
|
.manageDataDfsDirs(false)
|
||||||
|
.manageNameDfsDirs(false)
|
||||||
|
.startupOption(StartupOption.UPGRADE)
|
||||||
|
.build();
|
||||||
|
DFSInotifyEventInputStream ieis =
|
||||||
|
cluster.getFileSystem().getInotifyEventStream(0);
|
||||||
|
|
||||||
|
EventBatch batch;
|
||||||
|
Event.CreateEvent ce;
|
||||||
|
Event.RenameEvent re;
|
||||||
|
|
||||||
|
// mkdir /input
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
|
||||||
|
ce = (Event.CreateEvent) batch.getEvents()[0];
|
||||||
|
assertEquals(ce.getPath(), "/input");
|
||||||
|
|
||||||
|
// mkdir /input/dir1~5
|
||||||
|
for (int i = 1; i <= 5; i++) {
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
|
||||||
|
ce = (Event.CreateEvent) batch.getEvents()[0];
|
||||||
|
assertEquals(ce.getPath(), "/input/dir" + i);
|
||||||
|
}
|
||||||
|
// copyFromLocal randome_file_1~2 /input/dir1~2
|
||||||
|
for (int i = 1; i <= 2; i++) {
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
if (batch.getEvents()[0].getEventType() != Event.EventType.CREATE) {
|
||||||
|
FSImage.LOG.debug("");
|
||||||
|
}
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
|
||||||
|
|
||||||
|
// copyFromLocal randome_file_1 /input/dir1, CLOSE
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
|
||||||
|
|
||||||
|
// copyFromLocal randome_file_1 /input/dir1, CLOSE
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() ==
|
||||||
|
Event.EventType.RENAME);
|
||||||
|
re = (Event.RenameEvent) batch.getEvents()[0];
|
||||||
|
assertEquals(re.getDstPath(), "/input/dir" + i + "/randome_file_" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
// mv /input/dir1/randome_file_1 /input/dir3/randome_file_3
|
||||||
|
long txIDBeforeRename = batch.getTxid();
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
|
||||||
|
re = (Event.RenameEvent) batch.getEvents()[0];
|
||||||
|
assertEquals(re.getDstPath(), "/input/dir3/randome_file_3");
|
||||||
|
|
||||||
|
|
||||||
|
// rmdir /input/dir1
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
|
||||||
|
assertEquals(((Event.UnlinkEvent) batch.getEvents()[0]).getPath(),
|
||||||
|
"/input/dir1");
|
||||||
|
long lastTxID = batch.getTxid();
|
||||||
|
|
||||||
|
// Start inotify from the tx before rename /input/dir1/randome_file_1
|
||||||
|
ieis = cluster.getFileSystem().getInotifyEventStream(txIDBeforeRename);
|
||||||
|
batch = TestDFSInotifyEventInputStream.waitForNextEvents(ieis);
|
||||||
|
assertEquals(1, batch.getEvents().length);
|
||||||
|
assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
|
||||||
|
re = (Event.RenameEvent) batch.getEvents()[0];
|
||||||
|
assertEquals(re.getDstPath(), "/input/dir3/randome_file_3");
|
||||||
|
|
||||||
|
// Try to read beyond available edits
|
||||||
|
ieis = cluster.getFileSystem().getInotifyEventStream(lastTxID + 1);
|
||||||
|
assertNull(ieis.poll());
|
||||||
|
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
Loading…
Reference in New Issue