HDDS-2164 : om.db.checkpoints is getting filling up fast. (#1536)

This commit is contained in:
avijayanhwx 2019-10-04 12:44:21 -07:00 committed by Anu Engineer
parent 10bdc592d6
commit f3eaa84f9d
10 changed files with 240 additions and 125 deletions

View File

@ -83,7 +83,7 @@ public class RDBCheckpointManager {
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
LOG.debug("Created checkpoint at " + checkpointPath.toString() + " in "
LOG.info("Created checkpoint at " + checkpointPath.toString() + " in "
+ duration + " milliseconds");
return new RocksDBCheckpoint(

View File

@ -76,7 +76,8 @@ public class RocksDBCheckpoint implements DBCheckpoint {
@Override
public void cleanupCheckpoint() throws IOException {
LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
LOG.info("Cleaning up RocksDB checkpoint at " +
checkpointLocation.toString());
FileUtils.deleteDirectory(checkpointLocation.toFile());
}

View File

@ -18,14 +18,13 @@
package org.apache.hadoop.ozone;
import com.google.common.base.Joiner;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
@ -34,16 +33,22 @@ import java.security.SecureRandom;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.zip.GZIPOutputStream;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorOutputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@ -346,61 +351,51 @@ public final class OmUtils {
}
/**
* Given a source directory, create a tar.gz file from it.
*
* @param sourcePath the path to the directory to be archived.
* @return tar.gz file
* Write OM DB Checkpoint to an output stream as a compressed file (tgz).
* @param checkpoint checkpoint file
* @param destination desination output stream.
* @throws IOException
*/
public static File createTarFile(Path sourcePath) throws IOException {
TarArchiveOutputStream tarOs = null;
try {
String sourceDir = sourcePath.toString();
String fileName = sourceDir.concat(".tar.gz");
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
GZIPOutputStream gzipOutputStream =
new GZIPOutputStream(new BufferedOutputStream(fileOutputStream));
tarOs = new TarArchiveOutputStream(gzipOutputStream);
File folder = new File(sourceDir);
File[] filesInDir = folder.listFiles();
if (filesInDir != null) {
for (File file : filesInDir) {
addFilesToArchive(file.getName(), file, tarOs);
public static void writeOmDBCheckpointToStream(DBCheckpoint checkpoint,
OutputStream destination)
throws IOException {
try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
.createCompressorOutputStream(CompressorStreamFactory.GZIP,
destination)) {
try (ArchiveOutputStream archiveOutputStream =
new TarArchiveOutputStream(gzippedOut)) {
Path checkpointPath = checkpoint.getCheckpointLocation();
for (Path path : Files.list(checkpointPath)
.collect(Collectors.toList())) {
if (path != null) {
Path fileName = path.getFileName();
if (fileName != null) {
includeFile(path.toFile(), fileName.toString(),
archiveOutputStream);
}
}
return new File(fileName);
} finally {
try {
org.apache.hadoop.io.IOUtils.closeStream(tarOs);
} catch (Exception e) {
LOG.error("Exception encountered when closing " +
"TAR file output stream: " + e);
}
}
} catch (CompressorException e) {
throw new IOException(
"Can't compress the checkpoint: " +
checkpoint.getCheckpointLocation(), e);
}
}
private static void addFilesToArchive(String source, File file,
TarArchiveOutputStream
tarFileOutputStream)
private static void includeFile(File file, String entryName,
ArchiveOutputStream archiveOutputStream)
throws IOException {
tarFileOutputStream.putArchiveEntry(new TarArchiveEntry(file, source));
if (file.isFile()) {
FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream =
new BufferedInputStream(fileInputStream);
IOUtils.copy(bufferedInputStream, tarFileOutputStream);
tarFileOutputStream.closeArchiveEntry();
fileInputStream.close();
} else if (file.isDirectory()) {
tarFileOutputStream.closeArchiveEntry();
File[] filesInDir = file.listFiles();
if (filesInDir != null) {
for (File cFile : filesInDir) {
addFilesToArchive(cFile.getAbsolutePath(), cFile,
tarFileOutputStream);
}
}
ArchiveEntry archiveEntry =
archiveOutputStream.createArchiveEntry(file, entryName);
archiveOutputStream.putArchiveEntry(archiveEntry);
try (FileInputStream fis = new FileInputStream(file)) {
IOUtils.copy(fis, archiveOutputStream);
}
archiveOutputStream.closeArchiveEntry();
}
/**

View File

@ -19,31 +19,39 @@
package org.apache.hadoop.ozone;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link OmUtils}.
*/
public class TestOmUtils {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@Rule
public Timeout timeout = new Timeout(60_000);
@ -96,22 +104,13 @@ public class TestOmUtils {
}
@Test
public void testCreateTarFile() throws Exception {
public void testWriteCheckpointToOutputStream() throws Exception {
File tempSnapshotDir = null;
FileInputStream fis = null;
FileOutputStream fos = null;
File tarFile = null;
try {
String testDirName = System.getProperty("java.io.tmpdir");
if (!testDirName.endsWith("/")) {
testDirName += "/";
}
testDirName += "TestCreateTarFile_Dir" + System.currentTimeMillis();
tempSnapshotDir = new File(testDirName);
tempSnapshotDir.mkdirs();
String testDirName = folder.newFolder().getAbsolutePath();
File file = new File(testDirName + "/temp1.txt");
FileWriter writer = new FileWriter(file);
writer.write("Test data 1");
@ -122,14 +121,60 @@ public class TestOmUtils {
writer.write("Test data 2");
writer.close();
tarFile = OmUtils.createTarFile(Paths.get(testDirName));
Assert.assertNotNull(tarFile);
File outputFile =
new File(Paths.get(testDirName, "output_file.tgz").toString());
TestDBCheckpoint dbCheckpoint = new TestDBCheckpoint(
Paths.get(testDirName));
OmUtils.writeOmDBCheckpointToStream(dbCheckpoint,
new FileOutputStream(outputFile));
assertNotNull(outputFile);
} finally {
IOUtils.closeStream(fis);
IOUtils.closeStream(fos);
FileUtils.deleteDirectory(tempSnapshotDir);
FileUtils.deleteQuietly(tarFile);
}
}
}
class TestDBCheckpoint implements DBCheckpoint {
private Path checkpointFile;
TestDBCheckpoint(Path checkpointFile) {
this.checkpointFile = checkpointFile;
}
@Override
public Path getCheckpointLocation() {
return checkpointFile;
}
@Override
public long getCheckpointTimestamp() {
return 0;
}
@Override
public long getLatestSequenceNumber() {
return 0;
}
@Override
public long checkpointCreationTimeTaken() {
return 0;
}
@Override
public void cleanupCheckpoint() throws IOException {
FileUtils.deleteDirectory(checkpointFile.toFile());
}
@Override
public void setRatisSnapshotIndex(long omRatisSnapshotIndex) {
}
@Override
public long getRatisSnapshotIndex() {
return 0;
}
}

View File

@ -154,8 +154,6 @@ public class TestOMDbCheckpointServlet {
Assert.assertTrue(
omMetrics.getLastCheckpointCreationTimeTaken() == 0);
Assert.assertTrue(
omMetrics.getLastCheckpointTarOperationTimeTaken() == 0);
Assert.assertTrue(
omMetrics.getLastCheckpointStreamingTimeTaken() == 0);
@ -164,8 +162,6 @@ public class TestOMDbCheckpointServlet {
Assert.assertTrue(tempFile.length() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointCreationTimeTaken() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointTarOperationTimeTaken() > 0);
Assert.assertTrue(
omMetrics.getLastCheckpointStreamingTimeTaken() > 0);
} finally {

View File

@ -23,9 +23,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
import static org.apache.hadoop.ozone.OzoneConsts.
OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
@ -34,12 +33,9 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.utils.db.DBStore;
@ -102,8 +98,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
return;
}
FileInputStream checkpointFileInputStream = null;
File checkPointTarFile = null;
DBCheckpoint checkpoint = null;
try {
boolean flush = false;
@ -131,8 +126,8 @@ public class OMDBCheckpointServlet extends HttpServlet {
ratisSnapshotIndex = om.getRatisSnapshotIndex();
}
DBCheckpoint checkpoint = omDbStore.getCheckpoint(flush);
if (checkpoint == null) {
checkpoint = omDbStore.getCheckpoint(flush);
if (checkpoint == null || checkpoint.getCheckpointLocation() == null) {
LOG.error("Unable to process metadata snapshot request. " +
"Checkpoint request returned null.");
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
@ -141,49 +136,41 @@ public class OMDBCheckpointServlet extends HttpServlet {
omMetrics.setLastCheckpointCreationTimeTaken(
checkpoint.checkpointCreationTimeTaken());
Instant start = Instant.now();
checkPointTarFile = OmUtils.createTarFile(
checkpoint.getCheckpointLocation());
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
LOG.debug("Time taken to archive the checkpoint : " + duration +
" milliseconds");
LOG.info("Checkpoint Tar location = " +
checkPointTarFile.getAbsolutePath());
omMetrics.setLastCheckpointTarOperationTimeTaken(duration);
Path file = checkpoint.getCheckpointLocation().getFileName();
if (file == null) {
return;
}
response.setContentType("application/x-tgz");
response.setHeader("Content-Disposition",
"attachment; filename=\"" +
checkPointTarFile.getName() + "\"");
file.toString() + ".tgz\"");
// Ratis snapshot index used when downloading DB checkpoint to OM follower
response.setHeader(OM_RATIS_SNAPSHOT_INDEX,
String.valueOf(ratisSnapshotIndex));
checkpointFileInputStream = new FileInputStream(checkPointTarFile);
start = Instant.now();
TransferFsImage.copyFileToStream(response.getOutputStream(),
checkPointTarFile,
checkpointFileInputStream,
throttler);
end = Instant.now();
Instant start = Instant.now();
OmUtils.writeOmDBCheckpointToStream(checkpoint,
response.getOutputStream());
Instant end = Instant.now();
duration = Duration.between(start, end).toMillis();
LOG.debug("Time taken to write the checkpoint to response output " +
long duration = Duration.between(start, end).toMillis();
LOG.info("Time taken to write the checkpoint to response output " +
"stream: " + duration + " milliseconds");
omMetrics.setLastCheckpointStreamingTimeTaken(duration);
checkpoint.cleanupCheckpoint();
} catch (IOException e) {
} catch (Exception e) {
LOG.error(
"Unable to process metadata snapshot request. ", e);
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} finally {
if (checkPointTarFile != null) {
FileUtils.deleteQuietly(checkPointTarFile);
if (checkpoint != null) {
try {
checkpoint.cleanupCheckpoint();
} catch (IOException e) {
LOG.error("Error trying to clean checkpoint at {} .",
checkpoint.getCheckpointLocation().toString());
}
}
IOUtils.closeStream(checkpointFileInputStream);
}
}

View File

@ -123,7 +123,6 @@ public class OMMetrics {
// Metrics to track checkpointing statistics from last run.
private @Metric MutableGaugeLong lastCheckpointCreationTimeTaken;
private @Metric MutableGaugeLong lastCheckpointTarOperationTimeTaken;
private @Metric MutableGaugeLong lastCheckpointStreamingTimeTaken;
private @Metric MutableCounterLong numBucketS3Creates;
@ -511,10 +510,6 @@ public class OMMetrics {
this.lastCheckpointCreationTimeTaken.set(val);
}
public void setLastCheckpointTarOperationTimeTaken(long val) {
this.lastCheckpointTarOperationTimeTaken.set(val);
}
public void setLastCheckpointStreamingTimeTaken(long val) {
this.lastCheckpointStreamingTimeTaken.set(val);
}
@ -756,11 +751,6 @@ public class OMMetrics {
return lastCheckpointCreationTimeTaken.value();
}
@VisibleForTesting
public long getLastCheckpointTarOperationTimeTaken() {
return lastCheckpointTarOperationTimeTaken.value();
}
@VisibleForTesting
public long getLastCheckpointStreamingTimeTaken() {
return lastCheckpointStreamingTimeTaken.value();

View File

@ -32,9 +32,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
@ -83,6 +85,65 @@ public class ReconUtils {
return getOzoneMetaDirPath(conf);
}
/**
* Given a source directory, create a tar.gz file from it.
*
* @param sourcePath the path to the directory to be archived.
* @return tar.gz file
* @throws IOException
*/
public static File createTarFile(Path sourcePath) throws IOException {
TarArchiveOutputStream tarOs = null;
try {
String sourceDir = sourcePath.toString();
String fileName = sourceDir.concat(".tar.gz");
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
GZIPOutputStream gzipOutputStream =
new GZIPOutputStream(new BufferedOutputStream(fileOutputStream));
tarOs = new TarArchiveOutputStream(gzipOutputStream);
File folder = new File(sourceDir);
File[] filesInDir = folder.listFiles();
if (filesInDir != null) {
for (File file : filesInDir) {
addFilesToArchive(file.getName(), file, tarOs);
}
}
return new File(fileName);
} finally {
try {
org.apache.hadoop.io.IOUtils.closeStream(tarOs);
} catch (Exception e) {
LOG.error("Exception encountered when closing " +
"TAR file output stream: " + e);
}
}
}
private static void addFilesToArchive(String source, File file,
TarArchiveOutputStream
tarFileOutputStream)
throws IOException {
tarFileOutputStream.putArchiveEntry(new TarArchiveEntry(file, source));
if (file.isFile()) {
FileInputStream fileInputStream = new FileInputStream(file);
BufferedInputStream bufferedInputStream =
new BufferedInputStream(fileInputStream);
org.apache.commons.compress.utils.IOUtils.copy(bufferedInputStream,
tarFileOutputStream);
tarFileOutputStream.closeArchiveEntry();
fileInputStream.close();
} else if (file.isDirectory()) {
tarFileOutputStream.closeArchiveEntry();
File[] filesInDir = file.listFiles();
if (filesInDir != null) {
for (File cFile : filesInDir) {
addFilesToArchive(cFile.getAbsolutePath(), cFile,
tarFileOutputStream);
}
}
}
}
/**
* Untar DB snapshot tar file to recon OM snapshot directory.
*

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@ -27,15 +28,16 @@ import static org.mockito.Mockito.when;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -66,6 +68,44 @@ public class TestReconUtils {
Assert.assertEquals(filePath, file.getAbsolutePath());
}
@Test
public void testCreateTarFile() throws Exception {
File tempSnapshotDir = null;
FileInputStream fis = null;
FileOutputStream fos = null;
File tarFile = null;
try {
String testDirName = System.getProperty("java.io.tmpdir");
if (!testDirName.endsWith("/")) {
testDirName += "/";
}
testDirName += "TestCreateTarFile_Dir" + System.currentTimeMillis();
tempSnapshotDir = new File(testDirName);
tempSnapshotDir.mkdirs();
File file = new File(testDirName + "/temp1.txt");
FileWriter writer = new FileWriter(file);
writer.write("Test data 1");
writer.close();
file = new File(testDirName + "/temp2.txt");
writer = new FileWriter(file);
writer.write("Test data 2");
writer.close();
tarFile = createTarFile(Paths.get(testDirName));
Assert.assertNotNull(tarFile);
} finally {
org.apache.hadoop.io.IOUtils.closeStream(fis);
org.apache.hadoop.io.IOUtils.closeStream(fos);
FileUtils.deleteDirectory(tempSnapshotDir);
FileUtils.deleteQuietly(tarFile);
}
}
@Test
public void testUntarCheckpointFile() throws Exception {
@ -87,7 +127,7 @@ public class TestReconUtils {
writer.close();
//Create test tar file.
File tarFile = OmUtils.createTarFile(newDir.toPath());
File tarFile = createTarFile(newDir.toPath());
File outputDir = folder.newFolder();
new ReconUtils().untarCheckpointFile(tarFile, outputDir.toPath());

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon.spi.impl;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -41,7 +42,6 @@ import java.io.InputStream;
import java.nio.file.Paths;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@ -96,7 +96,7 @@ public class TestOzoneManagerServiceProviderImpl extends
DBCheckpoint checkpoint = omMetadataManager.getStore()
.getCheckpoint(true);
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
File tarFile = createTarFile(checkpoint.getCheckpointLocation());
InputStream inputStream = new FileInputStream(tarFile);
ReconUtils reconUtilsMock = getMockReconUtils();
when(reconUtilsMock.makeHttpCall(any(), anyString()))
@ -147,7 +147,7 @@ public class TestOzoneManagerServiceProviderImpl extends
writer.close();
//Create test tar file.
File tarFile = OmUtils.createTarFile(checkpointDir.toPath());
File tarFile = createTarFile(checkpointDir.toPath());
InputStream fileInputStream = new FileInputStream(tarFile);
ReconUtils reconUtilsMock = getMockReconUtils();
when(reconUtilsMock.makeHttpCall(any(), anyString()))