HDDS-1136 : Add metric counters to capture the RocksDB checkpointing statistics.
Contributed by Aravindan Vijayan.
This commit is contained in:
parent
387dbe587a
commit
10b802b84b
|
@ -42,6 +42,11 @@ public interface DBCheckpoint {
|
||||||
*/
|
*/
|
||||||
long getLatestSequenceNumber();
|
long getLatestSequenceNumber();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time taken in milliseconds for the checkpoint to be created.
|
||||||
|
*/
|
||||||
|
long checkpointCreationTimeTaken();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy the contents of the specified checkpoint to ensure
|
* Destroy the contents of the specified checkpoint to ensure
|
||||||
* proper cleanup of the footprint on disk.
|
* proper cleanup of the footprint on disk.
|
||||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.hadoop.utils.db;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -41,7 +43,6 @@ public class RDBCheckpointManager {
|
||||||
public static final String RDB_CHECKPOINT_DIR_PREFIX = "rdb_checkpoint_";
|
public static final String RDB_CHECKPOINT_DIR_PREFIX = "rdb_checkpoint_";
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(RDBCheckpointManager.class);
|
LoggerFactory.getLogger(RDBCheckpointManager.class);
|
||||||
public static final String JAVA_TMP_DIR = "java.io.tmpdir";
|
|
||||||
private String checkpointNamePrefix = "";
|
private String checkpointNamePrefix = "";
|
||||||
|
|
||||||
public RDBCheckpointManager(RocksDB rocksDB) {
|
public RDBCheckpointManager(RocksDB rocksDB) {
|
||||||
|
@ -79,12 +80,19 @@ public class RDBCheckpointManager {
|
||||||
checkpointDir += "_" + RDB_CHECKPOINT_DIR_PREFIX + currentTime;
|
checkpointDir += "_" + RDB_CHECKPOINT_DIR_PREFIX + currentTime;
|
||||||
|
|
||||||
Path checkpointPath = Paths.get(parentDir, checkpointDir);
|
Path checkpointPath = Paths.get(parentDir, checkpointDir);
|
||||||
|
Instant start = Instant.now();
|
||||||
checkpoint.createCheckpoint(checkpointPath.toString());
|
checkpoint.createCheckpoint(checkpointPath.toString());
|
||||||
|
Instant end = Instant.now();
|
||||||
|
|
||||||
|
long duration = Duration.between(start, end).toMillis();
|
||||||
|
LOG.debug("Created checkpoint at " + checkpointPath.toString() + " in "
|
||||||
|
+ duration + " milliseconds");
|
||||||
|
|
||||||
return new RocksDBCheckpoint(
|
return new RocksDBCheckpoint(
|
||||||
checkpointPath,
|
checkpointPath,
|
||||||
currentTime,
|
currentTime,
|
||||||
db.getLatestSequenceNumber()); //Best guesstimate here. Not accurate.
|
db.getLatestSequenceNumber(), //Best guesstimate here. Not accurate.
|
||||||
|
duration);
|
||||||
|
|
||||||
} catch (RocksDBException e) {
|
} catch (RocksDBException e) {
|
||||||
LOG.error("Unable to create RocksDB Snapshot.", e);
|
LOG.error("Unable to create RocksDB Snapshot.", e);
|
||||||
|
@ -97,13 +105,16 @@ public class RDBCheckpointManager {
|
||||||
private Path checkpointLocation;
|
private Path checkpointLocation;
|
||||||
private long checkpointTimestamp;
|
private long checkpointTimestamp;
|
||||||
private long latestSequenceNumber;
|
private long latestSequenceNumber;
|
||||||
|
private long checkpointCreationTimeTaken;
|
||||||
|
|
||||||
RocksDBCheckpoint(Path checkpointLocation,
|
RocksDBCheckpoint(Path checkpointLocation,
|
||||||
long snapshotTimestamp,
|
long snapshotTimestamp,
|
||||||
long latestSequenceNumber) {
|
long latestSequenceNumber,
|
||||||
|
long checkpointCreationTimeTaken) {
|
||||||
this.checkpointLocation = checkpointLocation;
|
this.checkpointLocation = checkpointLocation;
|
||||||
this.checkpointTimestamp = snapshotTimestamp;
|
this.checkpointTimestamp = snapshotTimestamp;
|
||||||
this.latestSequenceNumber = latestSequenceNumber;
|
this.latestSequenceNumber = latestSequenceNumber;
|
||||||
|
this.checkpointCreationTimeTaken = checkpointCreationTimeTaken;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -121,8 +132,14 @@ public class RDBCheckpointManager {
|
||||||
return this.latestSequenceNumber;
|
return this.latestSequenceNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long checkpointCreationTimeTaken() {
|
||||||
|
return checkpointCreationTimeTaken;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanupCheckpoint() throws IOException {
|
public void cleanupCheckpoint() throws IOException {
|
||||||
|
LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
|
||||||
FileUtils.deleteDirectory(checkpointLocation.toFile());
|
FileUtils.deleteDirectory(checkpointLocation.toFile());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,176 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.om;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.
|
||||||
|
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import javax.servlet.ServletContext;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.ServletOutputStream;
|
||||||
|
import javax.servlet.WriteListener;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.mockito.Matchers;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.
|
||||||
|
OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
|
||||||
|
import static org.mockito.Mockito.doCallRealMethod;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class used for testing the OM DB Checkpoint provider servlet.
|
||||||
|
*/
|
||||||
|
public class TestOMDbCheckpointServlet {
|
||||||
|
private MiniOzoneCluster cluster = null;
|
||||||
|
private OMMetrics omMetrics;
|
||||||
|
private OzoneConfiguration conf;
|
||||||
|
private String clusterId;
|
||||||
|
private String scmId;
|
||||||
|
private String omId;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout timeout = new Timeout(60000);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a MiniDFSCluster for testing.
|
||||||
|
* <p>
|
||||||
|
* Ozone is made active by setting OZONE_ENABLED = true
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Before
|
||||||
|
public void init() throws Exception {
|
||||||
|
conf = new OzoneConfiguration();
|
||||||
|
clusterId = UUID.randomUUID().toString();
|
||||||
|
scmId = UUID.randomUUID().toString();
|
||||||
|
omId = UUID.randomUUID().toString();
|
||||||
|
conf.setBoolean(OZONE_ACL_ENABLED, true);
|
||||||
|
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
|
||||||
|
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||||
|
.setClusterId(clusterId)
|
||||||
|
.setScmId(scmId)
|
||||||
|
.setOmId(omId)
|
||||||
|
.build();
|
||||||
|
cluster.waitForClusterToBeReady();
|
||||||
|
omMetrics = cluster.getOzoneManager().getMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown MiniDFSCluster.
|
||||||
|
*/
|
||||||
|
@After
|
||||||
|
public void shutdown() {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDoGet() throws ServletException, IOException {
|
||||||
|
|
||||||
|
File tempFile = null;
|
||||||
|
try {
|
||||||
|
OMDBCheckpointServlet omDbCheckpointServletMock =
|
||||||
|
mock(OMDBCheckpointServlet.class);
|
||||||
|
|
||||||
|
doCallRealMethod().when(omDbCheckpointServletMock).init();
|
||||||
|
|
||||||
|
HttpServletRequest requestMock = mock(HttpServletRequest.class);
|
||||||
|
HttpServletResponse responseMock = mock(HttpServletResponse.class);
|
||||||
|
|
||||||
|
ServletContext servletContextMock = mock(ServletContext.class);
|
||||||
|
when(omDbCheckpointServletMock.getServletContext())
|
||||||
|
.thenReturn(servletContextMock);
|
||||||
|
|
||||||
|
when(servletContextMock.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE))
|
||||||
|
.thenReturn(cluster.getOzoneManager());
|
||||||
|
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH))
|
||||||
|
.thenReturn("true");
|
||||||
|
doNothing().when(responseMock).setContentType("application/x-tgz");
|
||||||
|
doNothing().when(responseMock).setHeader(Matchers.anyString(),
|
||||||
|
Matchers.anyString());
|
||||||
|
|
||||||
|
tempFile = File.createTempFile("testDoGet_" + System
|
||||||
|
.currentTimeMillis(), ".tar.gz");
|
||||||
|
|
||||||
|
FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
|
||||||
|
when(responseMock.getOutputStream()).thenReturn(
|
||||||
|
new ServletOutputStream() {
|
||||||
|
@Override
|
||||||
|
public boolean isReady() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWriteListener(WriteListener writeListener) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(int b) throws IOException {
|
||||||
|
fileOutputStream.write(b);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock,
|
||||||
|
responseMock);
|
||||||
|
|
||||||
|
omDbCheckpointServletMock.init();
|
||||||
|
|
||||||
|
Assert.assertTrue(
|
||||||
|
omMetrics.getLastCheckpointCreationTimeTaken() == 0);
|
||||||
|
Assert.assertTrue(
|
||||||
|
omMetrics.getLastCheckpointTarOperationTimeTaken() == 0);
|
||||||
|
Assert.assertTrue(
|
||||||
|
omMetrics.getLastCheckpointStreamingTimeTaken() == 0);
|
||||||
|
|
||||||
|
omDbCheckpointServletMock.doGet(requestMock, responseMock);
|
||||||
|
|
||||||
|
Assert.assertTrue(tempFile.length() > 0);
|
||||||
|
Assert.assertTrue(
|
||||||
|
omMetrics.getLastCheckpointCreationTimeTaken() > 0);
|
||||||
|
Assert.assertTrue(
|
||||||
|
omMetrics.getLastCheckpointTarOperationTimeTaken() > 0);
|
||||||
|
Assert.assertTrue(
|
||||||
|
omMetrics.getLastCheckpointStreamingTimeTaken() > 0);
|
||||||
|
} finally {
|
||||||
|
FileUtils.deleteQuietly(tempFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,8 @@ import static org.apache.hadoop.ozone.OzoneConsts.
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.http.HttpServlet;
|
import javax.servlet.http.HttpServlet;
|
||||||
|
@ -53,6 +55,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private transient DBStore omDbStore;
|
private transient DBStore omDbStore;
|
||||||
|
private transient OMMetrics omMetrics;
|
||||||
private transient DataTransferThrottler throttler = null;
|
private transient DataTransferThrottler throttler = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -67,6 +70,8 @@ public class OMDBCheckpointServlet extends HttpServlet {
|
||||||
}
|
}
|
||||||
|
|
||||||
omDbStore = om.getMetadataManager().getStore();
|
omDbStore = om.getMetadataManager().getStore();
|
||||||
|
omMetrics = om.getMetrics();
|
||||||
|
|
||||||
OzoneConfiguration configuration = om.getConfiguration();
|
OzoneConfiguration configuration = om.getConfiguration();
|
||||||
long transferBandwidth = configuration.getLongBytes(
|
long transferBandwidth = configuration.getLongBytes(
|
||||||
OMConfigKeys.OZONE_DB_CHECKPOINT_TRANSFER_RATE_KEY,
|
OMConfigKeys.OZONE_DB_CHECKPOINT_TRANSFER_RATE_KEY,
|
||||||
|
@ -112,19 +117,38 @@ public class OMDBCheckpointServlet extends HttpServlet {
|
||||||
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
omMetrics.setLastCheckpointCreationTimeTaken(
|
||||||
|
checkpoint.checkpointCreationTimeTaken());
|
||||||
|
|
||||||
|
Instant start = Instant.now();
|
||||||
checkPointTarFile = OmUtils.createTarFile(
|
checkPointTarFile = OmUtils.createTarFile(
|
||||||
checkpoint.getCheckpointLocation());
|
checkpoint.getCheckpointLocation());
|
||||||
LOG.info("Tar location = " + checkPointTarFile.getAbsolutePath());
|
Instant end = Instant.now();
|
||||||
|
|
||||||
|
long duration = Duration.between(start, end).toMillis();
|
||||||
|
LOG.debug("Time taken to archive the checkpoint : " + duration +
|
||||||
|
" milliseconds");
|
||||||
|
LOG.info("Checkpoint Tar location = " +
|
||||||
|
checkPointTarFile.getAbsolutePath());
|
||||||
|
omMetrics.setLastCheckpointTarOperationTimeTaken(duration);
|
||||||
|
|
||||||
response.setContentType("application/x-tgz");
|
response.setContentType("application/x-tgz");
|
||||||
response.setHeader("Content-Disposition",
|
response.setHeader("Content-Disposition",
|
||||||
"attachment; filename=\"" +
|
"attachment; filename=\"" +
|
||||||
checkPointTarFile.getName() + "\"");
|
checkPointTarFile.getName() + "\"");
|
||||||
|
|
||||||
checkpointFileInputStream = new FileInputStream(checkPointTarFile);
|
checkpointFileInputStream = new FileInputStream(checkPointTarFile);
|
||||||
|
start = Instant.now();
|
||||||
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
TransferFsImage.copyFileToStream(response.getOutputStream(),
|
||||||
checkPointTarFile,
|
checkPointTarFile,
|
||||||
checkpointFileInputStream,
|
checkpointFileInputStream,
|
||||||
throttler);
|
throttler);
|
||||||
|
end = Instant.now();
|
||||||
|
|
||||||
|
duration = Duration.between(start, end).toMillis();
|
||||||
|
LOG.debug("Time taken to write the checkpoint to response output " +
|
||||||
|
"stream: " + duration + " milliseconds");
|
||||||
|
omMetrics.setLastCheckpointStreamingTimeTaken(duration);
|
||||||
|
|
||||||
checkpoint.cleanupCheckpoint();
|
checkpoint.cleanupCheckpoint();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is for maintaining Ozone Manager statistics.
|
* This class is for maintaining Ozone Manager statistics.
|
||||||
|
@ -105,6 +106,10 @@ public class OMMetrics {
|
||||||
// few minutes before restart may not be included in this count.
|
// few minutes before restart may not be included in this count.
|
||||||
private @Metric MutableCounterLong numKeys;
|
private @Metric MutableCounterLong numKeys;
|
||||||
|
|
||||||
|
// Metrics to track checkpointing statistics from last run.
|
||||||
|
private @Metric MutableGaugeLong lastCheckpointCreationTimeTaken;
|
||||||
|
private @Metric MutableGaugeLong lastCheckpointTarOperationTimeTaken;
|
||||||
|
private @Metric MutableGaugeLong lastCheckpointStreamingTimeTaken;
|
||||||
|
|
||||||
public OMMetrics() {
|
public OMMetrics() {
|
||||||
}
|
}
|
||||||
|
@ -390,6 +395,18 @@ public class OMMetrics {
|
||||||
numGetServiceListFails.incr();
|
numGetServiceListFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setLastCheckpointCreationTimeTaken(long val) {
|
||||||
|
this.lastCheckpointCreationTimeTaken.set(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastCheckpointTarOperationTimeTaken(long val) {
|
||||||
|
this.lastCheckpointTarOperationTimeTaken.set(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastCheckpointStreamingTimeTaken(long val) {
|
||||||
|
this.lastCheckpointStreamingTimeTaken.set(val);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreates() {
|
public long getNumVolumeCreates() {
|
||||||
return numVolumeCreates.value();
|
return numVolumeCreates.value();
|
||||||
|
@ -606,6 +623,21 @@ public class OMMetrics {
|
||||||
return numAbortMultipartUploadFails.value();
|
return numAbortMultipartUploadFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getLastCheckpointCreationTimeTaken() {
|
||||||
|
return lastCheckpointCreationTimeTaken.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getLastCheckpointTarOperationTimeTaken() {
|
||||||
|
return lastCheckpointTarOperationTimeTaken.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getLastCheckpointStreamingTimeTaken() {
|
||||||
|
return lastCheckpointStreamingTimeTaken.value();
|
||||||
|
}
|
||||||
|
|
||||||
public void unRegister() {
|
public void unRegister() {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
ms.unregisterSource(SOURCE_NAME);
|
ms.unregisterSource(SOURCE_NAME);
|
||||||
|
|
Loading…
Reference in New Issue