mirror of https://github.com/apache/druid.git
Add freeSpacePercent config in segment location to enforce free space while storing segments (#5137)
* Add freeSpacePercent config in segment location config to enforce free space while storing segments * address review comments * address review comments: more doc on freeSpacePercent and use Double for freeSpacePercent
This commit is contained in:
parent
e538aa227b
commit
0f5c7d1aec
|
@ -31,7 +31,7 @@ The historical node uses several of the global configs in [Configuration](../con
|
||||||
|
|
||||||
|Property|Description|Default|
|
|Property|Description|Default|
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000}]`.| none |
|
|`druid.segmentCache.locations`|Segments assigned to a Historical node are first stored on the local file system (in a disk cache) and then served by the Historical node. These locations define where that local cache resides. This value cannot be NULL or EMPTY. Here is an example `druid.segmentCache.locations=[{"path": "/mnt/druidSegments", "maxSize": 10000}, "freeSpacePercent": 1.0]`. "freeSpacePercent" is optional, if provided then enforces that much of free disk partition space while storing segments. But, it depends on File.getTotalSpace() and File.getFreeSpace() methods, so enable if only if they work for your File System.| none |
|
||||||
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|
|`druid.segmentCache.deleteOnRemove`|Delete segment files from cache once a node is no longer serving a segment.|true|
|
||||||
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)|
|
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)|
|
||||||
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|
||||||
|
@ -39,6 +39,8 @@ The historical node uses several of the global configs in [Configuration](../con
|
||||||
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10|
|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10|
|
||||||
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads|
|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads|
|
||||||
|
|
||||||
|
In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
|
||||||
|
|
||||||
### Query Configs
|
### Query Configs
|
||||||
|
|
||||||
#### Concurrent Requests
|
#### Concurrent Requests
|
||||||
|
|
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.ISE;
|
|
||||||
import com.metamx.emitter.EmittingLogger;
|
import com.metamx.emitter.EmittingLogger;
|
||||||
import io.druid.guice.annotations.Json;
|
import io.druid.guice.annotations.Json;
|
||||||
import io.druid.segment.IndexIO;
|
import io.druid.segment.IndexIO;
|
||||||
|
@ -73,7 +72,11 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
||||||
|
|
||||||
this.locations = Lists.newArrayList();
|
this.locations = Lists.newArrayList();
|
||||||
for (StorageLocationConfig locationConfig : config.getLocations()) {
|
for (StorageLocationConfig locationConfig : config.getLocations()) {
|
||||||
locations.add(new StorageLocation(locationConfig.getPath(), locationConfig.getMaxSize()));
|
locations.add(new StorageLocation(
|
||||||
|
locationConfig.getPath(),
|
||||||
|
locationConfig.getMaxSize(),
|
||||||
|
locationConfig.getFreeSpacePercent()
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,33 +144,28 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
|
||||||
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
|
private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException
|
||||||
{
|
{
|
||||||
for (StorageLocation loc : getSortedList(locations)) {
|
for (StorageLocation loc : getSortedList(locations)) {
|
||||||
// locIter is ordered from empty to full
|
if (loc.canHandle(segment)) {
|
||||||
if (!loc.canHandle(segment.getSize())) {
|
File storageDir = new File(loc.getPath(), storageDirStr);
|
||||||
throw new ISE(
|
|
||||||
"Segment[%s:%,d] too large for storage[%s:%,d].",
|
|
||||||
segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
File storageDir = new File(loc.getPath(), storageDirStr);
|
|
||||||
|
|
||||||
try {
|
|
||||||
loadInLocationWithStartMarker(segment, storageDir);
|
|
||||||
return loc;
|
|
||||||
}
|
|
||||||
catch (SegmentLoadingException e) {
|
|
||||||
log.makeAlert(
|
|
||||||
e,
|
|
||||||
"Failed to load segment in current location %s, try next location if any",
|
|
||||||
loc.getPath().getAbsolutePath()
|
|
||||||
)
|
|
||||||
.addData("location", loc.getPath().getAbsolutePath())
|
|
||||||
.emit();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cleanupCacheFiles(loc.getPath(), storageDir);
|
loadInLocationWithStartMarker(segment, storageDir);
|
||||||
|
return loc;
|
||||||
}
|
}
|
||||||
catch (IOException e1) {
|
catch (SegmentLoadingException e) {
|
||||||
log.error(e1, "Failed to cleanup location " + storageDir.getAbsolutePath());
|
log.makeAlert(
|
||||||
|
e,
|
||||||
|
"Failed to load segment in current location %s, try next location if any",
|
||||||
|
loc.getPath().getAbsolutePath()
|
||||||
|
)
|
||||||
|
.addData("location", loc.getPath().getAbsolutePath())
|
||||||
|
.emit();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cleanupCacheFiles(loc.getPath(), storageDir);
|
||||||
|
}
|
||||||
|
catch (IOException e1) {
|
||||||
|
log.error(e1, "Failed to cleanup location " + storageDir.getAbsolutePath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,10 @@
|
||||||
package io.druid.segment.loading;
|
package io.druid.segment.loading;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import io.druid.java.util.common.logger.Logger;
|
||||||
import io.druid.timeline.DataSegment;
|
import io.druid.timeline.DataSegment;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -29,17 +31,33 @@ import java.util.Set;
|
||||||
*/
|
*/
|
||||||
class StorageLocation
|
class StorageLocation
|
||||||
{
|
{
|
||||||
|
private static final Logger log = new Logger(StorageLocation.class);
|
||||||
|
|
||||||
private final File path;
|
private final File path;
|
||||||
private final long maxSize;
|
private final long maxSize;
|
||||||
|
private final long freeSpaceToKeep;
|
||||||
private final Set<DataSegment> segments;
|
private final Set<DataSegment> segments;
|
||||||
|
|
||||||
private volatile long currSize = 0;
|
private volatile long currSize = 0;
|
||||||
|
|
||||||
StorageLocation(File path, long maxSize)
|
StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
|
||||||
{
|
{
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.maxSize = maxSize;
|
this.maxSize = maxSize;
|
||||||
|
|
||||||
|
if (freeSpacePercent != null) {
|
||||||
|
long totalSpaceInPartition = path.getTotalSpace();
|
||||||
|
this.freeSpaceToKeep = (long) ((freeSpacePercent * totalSpaceInPartition) / 100);
|
||||||
|
log.info(
|
||||||
|
"SegmentLocation[%s] will try and maintain [%d:%d] free space while loading segments.",
|
||||||
|
path,
|
||||||
|
freeSpaceToKeep,
|
||||||
|
totalSpaceInPartition
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.freeSpaceToKeep = 0;
|
||||||
|
}
|
||||||
|
|
||||||
this.segments = Sets.newHashSet();
|
this.segments = Sets.newHashSet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,9 +85,33 @@ class StorageLocation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean canHandle(long size)
|
boolean canHandle(DataSegment segment)
|
||||||
{
|
{
|
||||||
return available() >= size;
|
if (available() < segment.getSize()) {
|
||||||
|
log.warn(
|
||||||
|
"Segment[%s:%,d] too lage for storage[%s:%,d].",
|
||||||
|
segment.getIdentifier(), segment.getSize(), getPath(), available()
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (freeSpaceToKeep > 0) {
|
||||||
|
long currFreeSpace = path.getFreeSpace();
|
||||||
|
if ((freeSpaceToKeep + segment.getSize()) < currFreeSpace) {
|
||||||
|
log.warn(
|
||||||
|
"Segment[%s:%,d] too large for storage[%s:%,d] to maintain suggested freeSpace[%d], current freeSpace is [%d].",
|
||||||
|
segment.getIdentifier(),
|
||||||
|
segment.getSize(),
|
||||||
|
getPath(),
|
||||||
|
available(),
|
||||||
|
freeSpaceToKeep,
|
||||||
|
currFreeSpace
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long available()
|
synchronized long available()
|
||||||
|
|
|
@ -37,6 +37,9 @@ public class StorageLocationConfig
|
||||||
@Min(1)
|
@Min(1)
|
||||||
private long maxSize = Long.MAX_VALUE;
|
private long maxSize = Long.MAX_VALUE;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
private Double freeSpacePercent;
|
||||||
|
|
||||||
public File getPath()
|
public File getPath()
|
||||||
{
|
{
|
||||||
return path;
|
return path;
|
||||||
|
@ -59,6 +62,17 @@ public class StorageLocationConfig
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Double getFreeSpacePercent()
|
||||||
|
{
|
||||||
|
return freeSpacePercent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StorageLocationConfig setFreeSpacePercent(Double freeSpacePercent)
|
||||||
|
{
|
||||||
|
this.freeSpacePercent = freeSpacePercent;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
|
|
|
@ -36,7 +36,7 @@ public class StorageLocationTest
|
||||||
public void testStorageLocation() throws Exception
|
public void testStorageLocation() throws Exception
|
||||||
{
|
{
|
||||||
long expectedAvail = 1000L;
|
long expectedAvail = 1000L;
|
||||||
StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail);
|
StorageLocation loc = new StorageLocation(new File("/tmp"), expectedAvail, null);
|
||||||
|
|
||||||
verifyLoc(expectedAvail, loc);
|
verifyLoc(expectedAvail, loc);
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ public class StorageLocationTest
|
||||||
{
|
{
|
||||||
Assert.assertEquals(maxSize, loc.available());
|
Assert.assertEquals(maxSize, loc.available());
|
||||||
for (int i = 0; i <= maxSize; ++i) {
|
for (int i = 0; i <= maxSize; ++i) {
|
||||||
Assert.assertTrue(String.valueOf(i), loc.canHandle(i));
|
Assert.assertTrue(String.valueOf(i), loc.canHandle(makeSegment("2013/2014", i)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue