Move MSQ temporary storage to a runtime parameter instead of being configured from query context (#14061)

* 
    Adds new run time parameter druid.indexer.task.tmpStorageBytesPerTask. This sets a limit for the amount of temporary storage disk space used by tasks. This limit is currently only respected by MSQ tasks.
*   Removes query context parameters intermediateSuperSorterStorageMaxLocalBytes and composedIntermediateSuperSorterStorageEnabled. Composed intermediate super sorter (which was enabled by composedIntermediateSuperSorterStorageEnabled) is now enabled automatically if durableShuffleStorage is set to true. intermediateSuperSorterStorageMaxLocalBytes is calculated from the limit set by the run time parameter druid.indexer.task.tmpStorageBytesPerTask.
This commit is contained in:
Adarsh Sanjeev 2023-04-18 16:56:51 +05:30 committed by GitHub
parent 8eb854c845
commit a7d5c64aeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 375 additions and 58 deletions

View File

@ -1521,6 +1521,7 @@ Additional peon configs include:
|`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch-input-source.md) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
|`druid.indexer.task.storeEmptyColumns`|Boolean value for whether or not to store empty columns during ingestion. When set to true, Druid stores every column specified in the [`dimensionsSpec`](../ingestion/ingestion-spec.md#dimensionsspec). If you use schemaless ingestion and don't specify any dimensions to ingest, you must also set [`includeAllDimensions`](../ingestion/ingestion-spec.md#dimensionsspec) for Druid to store empty columns.<br/><br/>If you set `storeEmptyColumns` to false, Druid SQL queries referencing empty columns will fail. If you intend to leave `storeEmptyColumns` disabled, you should either ingest dummy data for empty columns or else not query on empty columns.<br/><br/>This configuration can be overwritten by setting `storeEmptyColumns` in the [task context](../ingestion/tasks.md#context-parameters).|true|
|`druid.indexer.task.tmpStorageBytesPerTask`|Maximum number of bytes per task to be used to store temporary files on disk. This usage is split among all temporary storage usages for the task. An exception might be thrown if this limit is too low for the task or if this limit would be exceeded. This limit is currently respected only by MSQ tasks. Other types of tasks might exceed this limit. A value of -1 disables this limit. |-1|
|`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
If the peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations:

View File

@ -337,6 +337,11 @@ cleaner can be scheduled to clean the directories corresponding to which there i
the storage connector to work upon the durable storage. The durable storage location should only be utilized to store the output
for cluster's MSQ tasks. If the location contains other files or directories, then they will get cleaned up as well.
Enabling durable storage also enables the use of local disk to store temporary files, such as the intermediate files produced
by the super sorter. The limit set by `druid.indexer.task.tmpStorageBytesPerTask` for maximum number of bytes of local
storage to be used per task will be respected by MSQ tasks. If the configured limit is too low, `NotEnoughTemporaryStorageFault`
may be thrown.
### Enable durable storage
To enable durable storage, you need to set the following common service properties:
@ -434,6 +439,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| <a name="error_TooManyWarnings">`TooManyWarnings`</a> | Exceeded the maximum allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| <a name="error_TooManyWorkers">`TooManyWorkers`</a> | Exceeded the maximum number of simultaneously-running workers. See the [Limits](#limits) table for more details. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. If this is lower than the hard limit (1,000 workers), then you can increase the limit by adding more memory to each task. |
| <a name="error_NotEnoughMemory">`NotEnoughMemory`</a> | Insufficient memory to launch a stage. | `suggestedServerMemory`: Suggested number of bytes of memory to allocate to a given process. <br /><br />`serverMemory`: The number of bytes of memory available to a single process.<br /><br />`usableMemory`: The number of usable bytes of memory for a single process.<br /><br />`serverWorkers`: The number of workers running in a single process.<br /><br />`serverThreads`: The number of threads in a single process. |
| <a name="error_NotEnoughTemporaryStorage">`NotEnoughTemporaryStorage`</a> | Insufficient temporary storage configured to launch a stage. This limit is set by the property `druid.indexer.task.tmpStorageBytesPerTask`. This property should be increased to the minimum suggested limit to resolve this.| `suggestedMinimumStorage`: Suggested number of bytes of temporary storage space to allocate to a given process. <br /><br />`configuredTemporaryStorage`: The number of bytes of storage currently configured. |
| <a name="error_WorkerFailed">`WorkerFailed`</a> | A worker task failed unexpectedly. | `errorMsg`<br /><br />`workerTaskId`: The ID of the worker task. |
| <a name="error_WorkerRpcFailed">`WorkerRpcFailed`</a> | A remote procedure call to a worker task failed and could not recover. | `workerTaskId`: the id of the worker task |
| <a name="error_UnknownError">`UnknownError`</a> | All other errors. | `message` |

View File

@ -597,16 +597,6 @@ public class ControllerImpl implements Controller
.put(
MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE,
isDurableStorageEnabled
).put(
MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(
task.getQuerySpec().getQuery().context()
)
).put(
MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
MultiStageQueryContext.getIntermediateSuperSorterStorageMaxLocalBytes(
task.getQuerySpec().getQuery().context()
)
).put(
MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED,
maxParseExceptions

View File

@ -19,6 +19,7 @@
package org.apache.druid.msq.exec;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
@ -179,6 +180,7 @@ public class WorkerImpl implements Worker
private final ConcurrentHashMap<StageId, WorkerStageKernel> stageKernelMap = new ConcurrentHashMap<>();
private final ByteTracker intermediateSuperSorterLocalStorageTracker;
private final boolean durableStageStorageEnabled;
private final WorkerStorageParameters workerStorageParameters;
/**
* Set once in {@link #runTask} and never reassigned.
@ -197,17 +199,31 @@ public class WorkerImpl implements Worker
private volatile boolean controllerAlive = true;
public WorkerImpl(MSQWorkerTask task, WorkerContext context)
{
this(
task,
context,
WorkerStorageParameters.createProductionInstance(
context.injector(),
MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext())) // If Durable Storage is enabled, then super sorter intermediate storage can be enabled.
)
);
}
@VisibleForTesting
public WorkerImpl(MSQWorkerTask task, WorkerContext context, WorkerStorageParameters workerStorageParameters)
{
this.task = task;
this.context = context;
this.selfDruidNode = context.selfNode();
this.processorBouncer = context.processorBouncer();
this.intermediateSuperSorterLocalStorageTracker = new ByteTracker(
MultiStageQueryContext.getIntermediateSuperSorterStorageMaxLocalBytes(QueryContext.of(task.getContext()))
);
this.durableStageStorageEnabled = MultiStageQueryContext.isDurableStorageEnabled(
QueryContext.of(task.getContext())
);
this.workerStorageParameters = workerStorageParameters;
long maxBytes = workerStorageParameters.isIntermediateStorageLimitConfigured() ? workerStorageParameters.getIntermediateSuperSorterStorageMaxLocalBytes() : Long.MAX_VALUE;
this.intermediateSuperSorterLocalStorageTracker = new ByteTracker(maxBytes);
}
@Override
@ -731,8 +747,7 @@ public class WorkerImpl implements Worker
final FileOutputChannelFactory fileOutputChannelFactory =
new FileOutputChannelFactory(fileChannelDirectory, frameSize, intermediateSuperSorterLocalStorageTracker);
if (MultiStageQueryContext.isComposedIntermediateSuperSorterStorageEnabled(QueryContext.of(task.getContext()))
&& durableStageStorageEnabled) {
if (durableStageStorageEnabled && workerStorageParameters.isIntermediateStorageLimitConfigured()) {
return new ComposingOutputChannelFactory(
ImmutableList.of(
fileOutputChannelFactory,

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Injector;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughTemporaryStorageFault;
import java.util.Objects;
/**
* Class for determining the amount of temporary disk space to allocate to various purposes, given the per-worker limit.
* Similar to {@link WorkerMemoryParameters}, but for temporary disk space.
*
* Currently only used to allocate disk space for intermediate output from super sorter storage, if intermediate super
* sorter storage is enabled.
*
* If it is enabled, keeps {@link #MINIMUM_BASIC_OPERATIONS_BYTES} for miscellaneous operations and
* configures the super sorter to use {@link #SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION} of the remaining space for
* intermediate files. If this value is less than {@link #MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES},
* {@link NotEnoughTemporaryStorageFault} is thrown.
*/
public class WorkerStorageParameters
{
/**
* Fraction of temporary worker storage that can be allocated to super sorter intermediate files.
*/
private static final double SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION = 0.8;
/**
* Fixed amount of temporary disc storage reserved for miscellaneous operations.
*/
private static final long MINIMUM_BASIC_OPERATIONS_BYTES = 1_000_000_000L;
/**
* Minimum threshold for number of bytes required for intermediate files. If the number of bytes is less than this
* threshold and intermediate super sorter storage is enabled, {@link NotEnoughTemporaryStorageFault} is thrown.
*/
private static final long MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES = 1_000_000_000L;
private final long intermediateSuperSorterStorageMaxLocalBytes;
private WorkerStorageParameters(final long intermediateSuperSorterStorageMaxLocalBytes)
{
this.intermediateSuperSorterStorageMaxLocalBytes = intermediateSuperSorterStorageMaxLocalBytes;
}
public static WorkerStorageParameters createProductionInstance(
final Injector injector,
final boolean isIntermediateSuperSorterStorageEnabled
)
{
long tmpStorageBytesPerTask = injector.getInstance(TaskConfig.class).getTmpStorageBytesPerTask();
return createInstance(tmpStorageBytesPerTask, isIntermediateSuperSorterStorageEnabled);
}
@VisibleForTesting
public static WorkerStorageParameters createInstanceForTests(final long tmpStorageBytesPerTask)
{
return new WorkerStorageParameters(tmpStorageBytesPerTask);
}
/**
* Returns an object specifiying temporary disk-usage parameters
* @param tmpStorageBytesPerTask amount of disk space to be allocated per task for intermediate files.
* @param isIntermediateSuperSorterStorageEnabled whether intermediate super sorter storage is enabled
*/
public static WorkerStorageParameters createInstance(
final long tmpStorageBytesPerTask,
final boolean isIntermediateSuperSorterStorageEnabled
)
{
if (!isIntermediateSuperSorterStorageEnabled || tmpStorageBytesPerTask == -1) {
return new WorkerStorageParameters(-1);
}
Preconditions.checkArgument(tmpStorageBytesPerTask > 0, "Temporary storage bytes passed: [%s] should be > 0", tmpStorageBytesPerTask);
long intermediateSuperSorterStorageMaxLocalBytes = computeUsableStorage(tmpStorageBytesPerTask);
if (intermediateSuperSorterStorageMaxLocalBytes < MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES) {
throw new MSQException(
new NotEnoughTemporaryStorageFault(
calculateSuggestedMinTemporaryStorage(),
tmpStorageBytesPerTask
)
);
}
return new WorkerStorageParameters(intermediateSuperSorterStorageMaxLocalBytes);
}
private static long computeUsableStorage(long tmpStorageBytesPerTask)
{
return (long) (SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION * (tmpStorageBytesPerTask - MINIMUM_BASIC_OPERATIONS_BYTES));
}
private static long calculateSuggestedMinTemporaryStorage()
{
return MINIMUM_BASIC_OPERATIONS_BYTES + (long) (MINIMUM_SUPER_SORTER_TMP_STORAGE_BYTES / SUPER_SORTER_TMP_STORAGE_USABLE_FRACTION);
}
public long getIntermediateSuperSorterStorageMaxLocalBytes()
{
return intermediateSuperSorterStorageMaxLocalBytes;
}
public boolean isIntermediateStorageLimitConfigured()
{
return intermediateSuperSorterStorageMaxLocalBytes != -1;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerStorageParameters that = (WorkerStorageParameters) o;
return intermediateSuperSorterStorageMaxLocalBytes == that.intermediateSuperSorterStorageMaxLocalBytes;
}
@Override
public int hashCode()
{
return Objects.hash(intermediateSuperSorterStorageMaxLocalBytes);
}
@Override
public String toString()
{
return "WorkerStorageParameters{" +
"intermediateSuperSorterStorageMaxLocalBytes=" + intermediateSuperSorterStorageMaxLocalBytes +
'}';
}
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.msq.indexing.error.InsertTimeOutOfBoundsFault;
import org.apache.druid.msq.indexing.error.InvalidNullByteFault;
import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.NotEnoughTemporaryStorageFault;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.indexing.error.QueryRuntimeFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
@ -117,6 +118,7 @@ public class MSQIndexingModule implements DruidModule
InsertTimeNullFault.class,
InsertTimeOutOfBoundsFault.class,
InvalidNullByteFault.class,
NotEnoughTemporaryStorageFault.class,
NotEnoughMemoryFault.class,
QueryNotSupportedFault.class,
QueryRuntimeFault.class,

View File

@ -47,7 +47,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
{
super(
CODE,
"Not enough memory. Required al teast %,d bytes. (total = %,d bytes; usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM memory with the -xmx option"
"Not enough memory. Required at least %,d bytes. (total = %,d bytes; usable = %,d bytes; server workers = %,d; server threads = %,d). Increase JVM memory with the -xmx option"
+ (serverWorkers > 1 ? " or reduce number of server workers" : ""),
suggestedServerMemory,
serverMemory,

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.error;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.Objects;
@JsonTypeName(NotEnoughTemporaryStorageFault.CODE)
public class NotEnoughTemporaryStorageFault extends BaseMSQFault
{
static final String CODE = "NotEnoughTemporaryStorageFault";
private final long suggestedMinimumStorage;
private final long configuredTemporaryStorage;
@JsonCreator
public NotEnoughTemporaryStorageFault(
@JsonProperty("suggestedMinimumStorage") final long suggestedMinimumStorage,
@JsonProperty("configuredTemporaryStorage") final long configuredTemporaryStorage
)
{
super(
CODE,
"Not enough temporary storage space for intermediate files. Requires at least %,d bytes. (configured = %,d bytes). Increase the limit by increasing tmpStorageBytesPerTask or "
+ "disable durable storage by setting the context parameter durableShuffleStorage as false.",
suggestedMinimumStorage,
configuredTemporaryStorage
);
this.suggestedMinimumStorage = suggestedMinimumStorage;
this.configuredTemporaryStorage = configuredTemporaryStorage;
}
@JsonProperty
public long getSuggestedMinimumStorage()
{
return suggestedMinimumStorage;
}
@JsonProperty
public long getConfiguredTemporaryStorage()
{
return configuredTemporaryStorage;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
NotEnoughTemporaryStorageFault that = (NotEnoughTemporaryStorageFault) o;
return suggestedMinimumStorage == that.suggestedMinimumStorage
&& configuredTemporaryStorage == that.configuredTemporaryStorage;
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), suggestedMinimumStorage, configuredTemporaryStorage);
}
@Override
public String toString()
{
return "NotEnoughTemporaryStorageFault{" +
"suggestedMinimumStorage=" + suggestedMinimumStorage +
", configuredTemporaryStorage=" + configuredTemporaryStorage +
'}';
}
}

View File

@ -75,7 +75,7 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
) throws IOException
{
// BaseLeafFrameProcessorFactory is used for native Druid queries, where the following input cases can happen:
// 1) Union datasources: N nonbroadcast inputs, which are are treated as one big input
// 1) Union datasources: N nonbroadcast inputs, which are treated as one big input
// 2) Join datasources: one nonbroadcast input, N broadcast inputs
// 3) All other datasources: single input

View File

@ -51,13 +51,6 @@ import java.util.stream.Collectors;
* List of context parameters not present in external docs:
* <br></br>
* <ol>
* <li><b>composedIntermediateSuperSorterStorageEnabled</b>: Whether to enable automatic fallback to durable storage from
* local storage for sorting's intermediate data. Requires to set-up `intermediateSuperSorterStorageMaxLocalBytes` limit
* for local storage and durable shuffle storage feature as well. Default value is <b>false</b>.</li>
*
* <li><b>intermediateSuperSorterStorageMaxLocalBytes</b>: Whether to enable a byte limit on local storage for
* sorting's intermediate data. If that limit is crossed,the task fails with {@link org.apache.druid.query.ResourceLimitExceededException}`.
* Default value is <b>9223372036854775807</b> </li>
*
* <li><b>maxInputBytesPerWorker</b>: Should be used in conjunction with taskAssignment `auto` mode. When dividing the
* input of a stage among the workers, this parameter determines the maximum size in bytes that are given to a single worker
@ -97,13 +90,6 @@ public class MultiStageQueryContext
public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode";
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.PARALLEL.toString();
public static final String CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES =
"intermediateSuperSorterStorageMaxLocalBytes";
public static final String CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE =
"composedIntermediateSuperSorterStorageEnabled";
private static final boolean DEFAULT_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE = false;
private static final long DEFAULT_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES = Long.MAX_VALUE;
public static final String CTX_DESTINATION = "destination";
private static final String DEFAULT_DESTINATION = null;
@ -157,22 +143,6 @@ public class MultiStageQueryContext
);
}
public static boolean isComposedIntermediateSuperSorterStorageEnabled(final QueryContext queryContext)
{
return queryContext.getBoolean(
CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE,
DEFAULT_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE
);
}
public static long getIntermediateSuperSorterStorageMaxLocalBytes(final QueryContext queryContext)
{
return queryContext.getLong(
CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES,
DEFAULT_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES
);
}
public static ClusterStatisticsMergeMode getClusterStatisticsMergeMode(QueryContext queryContext)
{
return ClusterStatisticsMergeMode.valueOf(

View File

@ -40,14 +40,14 @@ public class WorkerImplTest
@Test
public void testFetchStatsThrows()
{
WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 1, new HashMap<>(), 0), workerContext);
WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 1, new HashMap<>(), 0), workerContext, WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE));
Assert.assertThrows(ISE.class, () -> worker.fetchStatisticsSnapshot(new StageId("xx", 1)));
}
@Test
public void testFetchStatsWithTimeChunkThrows()
{
WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 1, new HashMap<>(), 0), workerContext);
WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 1, new HashMap<>(), 0), workerContext, WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE));
Assert.assertThrows(ISE.class, () -> worker.fetchStatisticsSnapshotForTimeChunk(new StageId("xx", 1), 1L));
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughTemporaryStorageFault;
import org.junit.Assert;
import org.junit.Test;
public class WorkerStorageParametersTest
{
@Test
public void test_WorkerStorageParameter_createInstance()
{
Assert.assertEquals(WorkerStorageParameters.createInstanceForTests(1000000000), WorkerStorageParameters.createInstance(2_250_000_000L, true));
}
@Test
public void test_insufficientTemporaryStorage()
{
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> WorkerStorageParameters.createInstance(2_000L, true)
);
Assert.assertEquals(new NotEnoughTemporaryStorageFault(2250000000L, 2000), e.getFault());
}
}

View File

@ -81,6 +81,7 @@ public class MSQFaultSerdeTest
assertFaultSerde(UnknownFault.forMessage("the message"));
assertFaultSerde(new WorkerFailedFault("the worker task", "the error msg"));
assertFaultSerde(new WorkerRpcFailedFault("the worker task"));
assertFaultSerde(new NotEnoughTemporaryStorageFault(250, 2));
}
@Test

View File

@ -232,8 +232,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, true)
.put(MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE, true)
.put(MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES, 100) // added so that practically everything still goes to durable storage channel
.build();
@ -241,8 +239,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(MultiStageQueryContext.CTX_FAULT_TOLERANCE, true)
.put(MultiStageQueryContext.CTX_COMPOSED_INTERMEDIATE_SUPER_SORTER_STORAGE, true)
.put(MultiStageQueryContext.CTX_INTERMEDIATE_SUPER_SORTER_STORAGE_MAX_LOCAL_BYTES, 100) // added so that practically everything still goes to durable storage channel
.build();
public static final Map<String, Object> SEQUENTIAL_MERGE_MSQ_CONTEXT =

View File

@ -43,7 +43,10 @@ import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.msq.exec.WorkerManagerClient;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.mockito.ArgumentMatchers;
@ -116,9 +119,19 @@ public class MSQTestControllerContext implements ControllerContext
if (controller == null) {
throw new ISE("Controller needs to be set using the register method");
}
WorkerStorageParameters workerStorageParameters;
// If we are testing durable storage, set a low limit on storage so that the durable storage will be used.
if (MultiStageQueryContext.isDurableStorageEnabled(QueryContext.of(task.getContext()))) {
workerStorageParameters = WorkerStorageParameters.createInstanceForTests(100);
} else {
workerStorageParameters = WorkerStorageParameters.createInstanceForTests(Long.MAX_VALUE);
}
Worker worker = new WorkerImpl(
task,
new MSQTestWorkerContext(inMemoryWorkers, controller, mapper, injector, workerMemoryParameters)
new MSQTestWorkerContext(inMemoryWorkers, controller, mapper, injector, workerMemoryParameters),
workerStorageParameters
);
inMemoryWorkers.put(task.getId(), worker);
statusMap.put(task.getId(), TaskStatus.running(task.getId()));

View File

@ -78,6 +78,7 @@ public class TaskConfig
private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1;
@JsonProperty
private final String baseDir;
@ -121,6 +122,9 @@ public class TaskConfig
@JsonProperty
private final boolean encapsulatedTask;
@JsonProperty
private final long tmpStorageBytesPerTask;
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@ -137,7 +141,8 @@ public class TaskConfig
// deprecated, only set to true to fall back to older behavior
@JsonProperty("batchProcessingMode") String batchProcessingMode,
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@ -183,6 +188,7 @@ public class TaskConfig
}
log.debug("Batch processing mode:[%s]", this.batchProcessingMode);
this.storeEmptyColumns = storeEmptyColumns == null ? DEFAULT_STORE_EMPTY_COLUMNS : storeEmptyColumns;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask == null ? DEFAULT_TMP_STORAGE_BYTES_PER_TASK : tmpStorageBytesPerTask;
}
private TaskConfig(
@ -199,7 +205,8 @@ public class TaskConfig
boolean batchMemoryMappedIndex,
BatchProcessingMode batchProcessingMode,
boolean storeEmptyColumns,
boolean encapsulatedTask
boolean encapsulatedTask,
long tmpStorageBytesPerTask
)
{
this.baseDir = baseDir;
@ -216,6 +223,7 @@ public class TaskConfig
this.batchProcessingMode = batchProcessingMode;
this.storeEmptyColumns = storeEmptyColumns;
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
}
@JsonProperty
@ -326,6 +334,12 @@ public class TaskConfig
return encapsulatedTask;
}
@JsonProperty
public long getTmpStorageBytesPerTask()
{
return tmpStorageBytesPerTask;
}
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
@ -351,7 +365,8 @@ public class TaskConfig
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask
encapsulatedTask,
tmpStorageBytesPerTask
);
}
}

View File

@ -40,6 +40,7 @@ public class TaskConfigBuilder
private String batchProcessingMode;
private Boolean storeEmptyColumns;
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
public TaskConfigBuilder setBaseDir(String baseDir)
{
@ -125,6 +126,12 @@ public class TaskConfigBuilder
return this;
}
public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask)
{
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
return this;
}
public TaskConfig build()
{
return new TaskConfig(
@ -141,7 +148,8 @@ public class TaskConfigBuilder
batchMemoryMappedIndex,
batchProcessingMode,
storeEmptyColumns,
enableTaskLevelLogPush
enableTaskLevelLogPush,
tmpStorageBytesPerTask
);
}
}

View File

@ -611,6 +611,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.setBaseDir(temporaryFolder.newFolder().toString())
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.setTmpStorageBytesPerTask(-1L)
.build();
return new TaskToolboxFactory(