HDFS-10463. TestRollingFileSystemSinkWithHdfs needs some cleanup. (Daniel Templeton via kasha)
This commit is contained in:
parent
72773f8ea8
commit
b4c8729cf9
|
@ -221,8 +221,12 @@ public class RollingFileSystemSinkTestBase {
|
||||||
mm1.testMetric2.incr(2);
|
mm1.testMetric2.incr(2);
|
||||||
|
|
||||||
ms.publishMetricsNow(); // publish the metrics
|
ms.publishMetricsNow(); // publish the metrics
|
||||||
ms.stop();
|
|
||||||
ms.shutdown();
|
try {
|
||||||
|
ms.stop();
|
||||||
|
} finally {
|
||||||
|
ms.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
return readLogFile(path, then, count);
|
return readLogFile(path, then, count);
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
public void testSilentAppend() throws Exception {
|
public void testSilentAppend() throws Exception {
|
||||||
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
||||||
|
|
||||||
assertExtraContents(doAppendTest(path, false, true, 1));
|
assertExtraContents(doAppendTest(path, true, true, 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -159,8 +159,11 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
assertTrue("No exception was generated while writing metrics "
|
assertTrue("No exception was generated while writing metrics "
|
||||||
+ "even though HDFS was unavailable", MockSink.errored);
|
+ "even though HDFS was unavailable", MockSink.errored);
|
||||||
|
|
||||||
ms.stop();
|
try {
|
||||||
ms.shutdown();
|
ms.stop();
|
||||||
|
} finally {
|
||||||
|
ms.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -183,13 +186,14 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ms.stop();
|
ms.stop();
|
||||||
|
|
||||||
fail("No exception was generated while stopping sink "
|
fail("No exception was generated while stopping sink "
|
||||||
+ "even though HDFS was unavailable");
|
+ "even though HDFS was unavailable");
|
||||||
} catch (MetricsException ex) {
|
} catch (MetricsException ex) {
|
||||||
// Expected
|
// Expected
|
||||||
|
} finally {
|
||||||
|
ms.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
ms.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -215,8 +219,11 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
+ "while HDFS was unavailable, even though the sink is set to "
|
+ "while HDFS was unavailable, even though the sink is set to "
|
||||||
+ "ignore errors", MockSink.errored);
|
+ "ignore errors", MockSink.errored);
|
||||||
|
|
||||||
ms.stop();
|
try {
|
||||||
ms.shutdown();
|
ms.stop();
|
||||||
|
} finally {
|
||||||
|
ms.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -237,13 +244,15 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
shutdownHdfs();
|
shutdownHdfs();
|
||||||
MockSink.errored = false;
|
MockSink.errored = false;
|
||||||
|
|
||||||
ms.stop();
|
try {
|
||||||
|
ms.stop();
|
||||||
|
|
||||||
assertFalse("An exception was generated stopping sink "
|
assertFalse("An exception was generated stopping sink "
|
||||||
+ "while HDFS was unavailable, even though the sink is set to "
|
+ "while HDFS was unavailable, even though the sink is set to "
|
||||||
+ "ignore errors", MockSink.errored);
|
+ "ignore errors", MockSink.errored);
|
||||||
|
} finally {
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -272,29 +281,37 @@ public class TestRollingFileSystemSinkWithHdfs
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
// Sleep until the flusher has run. This should never actually need to
|
try {
|
||||||
// sleep, but the sleep is here to make sure this test isn't flakey.
|
// Sleep until the flusher has run. This should never actually need to
|
||||||
while (!RollingFileSystemSink.hasFlushed) {
|
// sleep, but the sleep is here to make sure this test isn't flakey.
|
||||||
Thread.sleep(10L);
|
while (!RollingFileSystemSink.hasFlushed) {
|
||||||
|
Thread.sleep(10L);
|
||||||
|
|
||||||
if (++count > 1000) {
|
if (++count > 1000) {
|
||||||
fail("Flush thread did not run within 10 seconds");
|
fail("Flush thread did not run within 10 seconds");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Calendar now = Calendar.getInstance();
|
||||||
|
Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
|
||||||
|
FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
|
||||||
|
Path currentFile =
|
||||||
|
findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
|
||||||
|
FileStatus status = fs.getFileStatus(currentFile);
|
||||||
|
|
||||||
|
// Each metrics record is 118+ bytes, depending on hostname
|
||||||
|
assertTrue("The flusher thread didn't flush the log contents. Expected "
|
||||||
|
+ "at least 236 bytes in the log file, but got " + status.getLen(),
|
||||||
|
status.getLen() >= 236);
|
||||||
|
} finally {
|
||||||
|
RollingFileSystemSink.forceFlush = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
ms.stop();
|
||||||
|
} finally {
|
||||||
|
ms.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Calendar now = Calendar.getInstance();
|
|
||||||
Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
|
|
||||||
FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
|
|
||||||
Path currentFile =
|
|
||||||
findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
|
|
||||||
FileStatus status = fs.getFileStatus(currentFile);
|
|
||||||
|
|
||||||
// Each metrics record is 118+ bytes, depending on hostname
|
|
||||||
assertTrue("The flusher thread didn't flush the log contents. Expected "
|
|
||||||
+ "at least 236 bytes in the log file, but got " + status.getLen(),
|
|
||||||
status.getLen() >= 236);
|
|
||||||
|
|
||||||
ms.stop();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue