Use worker number instead of task id in MSQ for communication to/from workers. (#13062)

* Conversion from taskId to workerNumber in the workerClient

* storage connector changes, suffix file when finish writing to it

* Fix tests

* Trigger Build

* convert IntFunction to a dedicated interface

* first review round

* use a dummy file to indicate success

* fetch the first filename from the list in case of multiple files

* tests working, fix semantic issue with ls

* change how the success flag works

* comments, checkstyle, method rename

* fix test

* forbiddenapis fix

* Trigger Build

* change the writer

* dead store fix

* Review comments

* revert changes

* review

* review comments

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/DurableStorageInputChannelFactory.java

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>

* update error messages

* better error messages

* fix checkstyle

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
This commit is contained in:
Laksh Singla 2022-11-03 10:25:45 +05:30 committed by GitHub
parent 018f984781
commit 7cb21cb968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 349 additions and 81 deletions

View File

@ -31,7 +31,7 @@ import java.io.OutputStream;
import java.nio.file.Files; import java.nio.file.Files;
/** /**
* Implementation that uses local filesystem. All paths are appended with the base path, in such a way that its not visible * Implementation that uses local filesystem. All paths are appended with the base path, in such a way that it is not visible
* to the users of this class. * to the users of this class.
*/ */
public class LocalFileStorageConnector implements StorageConnector public class LocalFileStorageConnector implements StorageConnector
@ -54,10 +54,6 @@ public class LocalFileStorageConnector implements StorageConnector
/** /**
* Reads the file present as basePath + path. Will throw an IO exception in case the file is not present. * Reads the file present as basePath + path. Will throw an IO exception in case the file is not present.
* Closing of the stream is the responsibility of the caller. * Closing of the stream is the responsibility of the caller.
*
* @param path
* @return
* @throws IOException
*/ */
@Override @Override
public InputStream read(String path) throws IOException public InputStream read(String path) throws IOException

View File

@ -71,7 +71,6 @@ public interface ControllerClient extends AutoCloseable
* Client side method to inform the controller about the warnings generated by the given worker. * Client side method to inform the controller about the warnings generated by the given worker.
*/ */
void postWorkerWarning( void postWorkerWarning(
String workerId,
List<MSQErrorReport> MSQErrorReports List<MSQErrorReport> MSQErrorReports
) throws IOException; ) throws IOException;
List<String> getTaskList() throws IOException; List<String> getTaskList() throws IOException;

View File

@ -74,7 +74,6 @@ public interface ControllerContext
* Client for communicating with workers. * Client for communicating with workers.
*/ */
WorkerClient taskClientFor(Controller controller); WorkerClient taskClientFor(Controller controller);
/** /**
* Writes controller task report. * Writes controller task report.
*/ */

View File

@ -146,7 +146,7 @@ import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit; import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.scan.ScanQueryKit; import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory; import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory; import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory; import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.msq.util.DimensionSchemaUtils;
@ -1202,7 +1202,6 @@ public class ControllerImpl implements Controller
if (MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context())) { if (MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context())) {
inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation( inputChannelFactory = DurableStorageInputChannelFactory.createStandardImplementation(
id(), id(),
() -> taskIds,
MSQTasks.makeStorageConnector(context.injector()), MSQTasks.makeStorageConnector(context.injector()),
closer closer
); );
@ -1303,7 +1302,7 @@ public class ControllerImpl implements Controller
private void cleanUpDurableStorageIfNeeded() private void cleanUpDurableStorageIfNeeded()
{ {
if (MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context())) { if (MultiStageQueryContext.isDurableStorageEnabled(task.getQuerySpec().getQuery().context())) {
final String controllerDirName = DurableStorageOutputChannelFactory.getControllerDirectory(task.getId()); final String controllerDirName = DurableStorageUtils.getControllerDirectory(task.getId());
try { try {
// Delete all temporary files as a failsafe // Delete all temporary files as a failsafe
MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(controllerDirName); MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(controllerDirName);

View File

@ -36,7 +36,7 @@ public interface WorkerClient extends AutoCloseable
/** /**
* Worker's client method to add a {@link WorkOrder} to the worker to work on * Worker's client method to add a {@link WorkOrder} to the worker to work on
*/ */
ListenableFuture<Void> postWorkOrder(String workerId, WorkOrder workOrder); ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder workOrder);
/** /**
* Worker's client method to inform it of the partition boundaries for the given stage. This is usually invoked by the * Worker's client method to inform it of the partition boundaries for the given stage. This is usually invoked by the
@ -50,13 +50,15 @@ public interface WorkerClient extends AutoCloseable
/** /**
* Worker's client method to inform that the work has been done, and it can initiate cleanup and shutdown * Worker's client method to inform that the work has been done, and it can initiate cleanup and shutdown
* @param workerTaskId
*/ */
ListenableFuture<Void> postFinish(String workerId); ListenableFuture<Void> postFinish(String workerTaskId);
/** /**
* Fetches all the counters gathered by that worker * Fetches all the counters gathered by that worker
* @param workerTaskId
*/ */
ListenableFuture<CounterSnapshotsTree> getCounters(String workerId); ListenableFuture<CounterSnapshotsTree> getCounters(String workerTaskId);
/** /**
* Worker's client method that informs it that the results and resources for the given stage are no longer required * Worker's client method that informs it that the results and resources for the given stage are no longer required

View File

@ -102,6 +102,7 @@ import org.apache.druid.msq.kernel.worker.WorkerStagePhase;
import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory; import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory; import org.apache.druid.msq.shuffle.DurableStorageOutputChannelFactory;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory; import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
@ -210,7 +211,12 @@ public class WorkerImpl implements Worker
} }
catch (Throwable e) { catch (Throwable e) {
maybeErrorReport = Optional.of( maybeErrorReport = Optional.of(
MSQErrorReport.fromException(id(), MSQTasks.getHostFromSelfNode(selfDruidNode), null, e) MSQErrorReport.fromException(
id(),
MSQTasks.getHostFromSelfNode(selfDruidNode),
null,
e
)
); );
} }
@ -241,6 +247,7 @@ public class WorkerImpl implements Worker
this.controllerClient = context.makeControllerClient(task.getControllerTaskId()); this.controllerClient = context.makeControllerClient(task.getControllerTaskId());
closer.register(controllerClient::close); closer.register(controllerClient::close);
context.registerWorker(this, closer); // Uses controllerClient, so must be called after that is initialized context.registerWorker(this, closer); // Uses controllerClient, so must be called after that is initialized
this.workerClient = new ExceptionWrappingWorkerClient(context.makeWorkerClient()); this.workerClient = new ExceptionWrappingWorkerClient(context.makeWorkerClient());
closer.register(workerClient::close); closer.register(workerClient::close);
@ -583,7 +590,6 @@ public class WorkerImpl implements Worker
if (durableStageStorageEnabled) { if (durableStageStorageEnabled) {
return DurableStorageInputChannelFactory.createStandardImplementation( return DurableStorageInputChannelFactory.createStandardImplementation(
task.getControllerTaskId(), task.getControllerTaskId(),
workerTaskList,
MSQTasks.makeStorageConnector(context.injector()), MSQTasks.makeStorageConnector(context.injector()),
closer closer
); );
@ -601,8 +607,9 @@ public class WorkerImpl implements Worker
if (durableStageStorageEnabled) { if (durableStageStorageEnabled) {
return DurableStorageOutputChannelFactory.createStandardImplementation( return DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(), task.getControllerTaskId(),
id(), task().getWorkerNumber(),
stageNumber, stageNumber,
task().getId(),
frameSize, frameSize,
MSQTasks.makeStorageConnector(context.injector()) MSQTasks.makeStorageConnector(context.injector())
); );
@ -709,18 +716,18 @@ public class WorkerImpl implements Worker
// Therefore, the logic for cleaning the stage output in case of a worker/machine crash has to be external. // Therefore, the logic for cleaning the stage output in case of a worker/machine crash has to be external.
// We currently take care of this in the controller. // We currently take care of this in the controller.
if (durableStageStorageEnabled) { if (durableStageStorageEnabled) {
final String fileName = DurableStorageOutputChannelFactory.getPartitionFileName( final String folderName = DurableStorageUtils.getTaskIdOutputsFolderName(
task.getControllerTaskId(), task.getControllerTaskId(),
task.getId(),
stageId.getStageNumber(), stageId.getStageNumber(),
partition task.getWorkerNumber(),
task.getId()
); );
try { try {
MSQTasks.makeStorageConnector(context.injector()).deleteFile(fileName); MSQTasks.makeStorageConnector(context.injector()).deleteRecursively(folderName);
} }
catch (Exception e) { catch (Exception e) {
// If an error is thrown while cleaning up a file, log it and try to continue with the cleanup // If an error is thrown while cleaning up a file, log it and try to continue with the cleanup
log.warn(e, "Error while cleaning up temporary files at path " + fileName); log.warn(e, "Error while cleaning up folder at path " + folderName);
} }
} }
} }
@ -888,7 +895,38 @@ public class WorkerImpl implements Worker
for (OutputChannel channel : outputChannels.getAllChannels()) { for (OutputChannel channel : outputChannels.getAllChannels()) {
stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>()) stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
.computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel()); .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
} }
if (durableStageStorageEnabled) {
// Once the outputs channels have been resolved and are ready for reading, the worker appends the filename
// with a special marker flag and adds it to the
DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
task().getWorkerNumber(),
stageDef.getStageNumber(),
task().getId(),
frameContext.memoryParameters().getStandardFrameSize(),
MSQTasks.makeStorageConnector(context.injector())
);
try {
durableStorageOutputChannelFactory.createSuccessFile(task.getId());
}
catch (IOException e) {
throw new ISE(
e,
"Unable to create the success file [%s] at the location [%s]",
DurableStorageUtils.SUCCESS_MARKER_FILENAME,
DurableStorageUtils.getSuccessFilePath(
task.getControllerTaskId(),
stageDef.getStageNumber(),
task().getWorkerNumber()
)
);
}
}
kernelManipulationQueue.add(holder -> holder.getStageKernelMap() kernelManipulationQueue.add(holder -> holder.getStageKernelMap()
.get(stageDef.getId()) .get(stageDef.getId())
.setResultsComplete(resultObject)); .setResultsComplete(resultObject));

View File

@ -106,12 +106,11 @@ public class ControllerChatHandler implements ChatHandler
* See {@link ControllerClient#postWorkerWarning} for the client-side code that calls this API. * See {@link ControllerClient#postWorkerWarning} for the client-side code that calls this API.
*/ */
@POST @POST
@Path("/workerWarning/{taskId}") @Path("/workerWarning")
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON)
public Response httpPostWorkerWarning( public Response httpPostWorkerWarning(
final List<MSQErrorReport> errorReport, final List<MSQErrorReport> errorReport,
@PathParam("taskId") final String taskId,
@Context final HttpServletRequest req @Context final HttpServletRequest req
) )
{ {

View File

@ -122,12 +122,9 @@ public class IndexerControllerClient implements ControllerClient
} }
@Override @Override
public void postWorkerWarning(String workerId, List<MSQErrorReport> MSQErrorReports) throws IOException public void postWorkerWarning(List<MSQErrorReport> MSQErrorReports) throws IOException
{ {
final String path = StringUtils.format( final String path = "/workerWarning";
"/workerWarning/%s",
StringUtils.urlEncode(workerId)
);
doRequest( doRequest(
new RequestBuilder(HttpMethod.POST, path) new RequestBuilder(HttpMethod.POST, path)

View File

@ -57,7 +57,7 @@ public class MSQWarningReportSimplePublisher implements MSQWarningReportPublishe
final MSQErrorReport warningReport = MSQErrorReport.fromException(taskId, host, stageNumber, e); final MSQErrorReport warningReport = MSQErrorReport.fromException(taskId, host, stageNumber, e);
try { try {
controllerClient.postWorkerWarning(workerId, ImmutableList.of(warningReport)); controllerClient.postWorkerWarning(ImmutableList.of(warningReport));
} }
catch (IOException e2) { catch (IOException e2) {
throw new RuntimeException(e2); throw new RuntimeException(e2);

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.shuffle; package org.apache.druid.msq.shuffle;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.druid.frame.channel.ReadableFrameChannel; import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel; import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.IOE;
@ -27,36 +28,36 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.InputChannelFactory; import org.apache.druid.msq.indexing.InputChannelFactory;
import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnector;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Supplier;
/** /**
* Provides input channels connected to durable storage. * Provides input channels connected to durable storage.
*/ */
public class DurableStorageInputChannelFactory implements InputChannelFactory public class DurableStorageInputChannelFactory implements InputChannelFactory
{ {
private static final Logger LOG = new Logger(DurableStorageInputChannelFactory.class);
private final StorageConnector storageConnector; private final StorageConnector storageConnector;
private final ExecutorService remoteInputStreamPool; private final ExecutorService remoteInputStreamPool;
private final String controllerTaskId; private final String controllerTaskId;
private final Supplier<List<String>> taskList;
public DurableStorageInputChannelFactory( public DurableStorageInputChannelFactory(
final String controllerTaskId, final String controllerTaskId,
final Supplier<List<String>> taskList,
final StorageConnector storageConnector, final StorageConnector storageConnector,
final ExecutorService remoteInputStreamPool final ExecutorService remoteInputStreamPool
) )
{ {
this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, "controllerTaskId"); this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, "controllerTaskId");
this.taskList = Preconditions.checkNotNull(taskList, "taskList");
this.storageConnector = Preconditions.checkNotNull(storageConnector, "storageConnector"); this.storageConnector = Preconditions.checkNotNull(storageConnector, "storageConnector");
this.remoteInputStreamPool = Preconditions.checkNotNull(remoteInputStreamPool, "remoteInputStreamPool"); this.remoteInputStreamPool = Preconditions.checkNotNull(remoteInputStreamPool, "remoteInputStreamPool");
} }
@ -67,7 +68,6 @@ public class DurableStorageInputChannelFactory implements InputChannelFactory
*/ */
public static DurableStorageInputChannelFactory createStandardImplementation( public static DurableStorageInputChannelFactory createStandardImplementation(
final String controllerTaskId, final String controllerTaskId,
final Supplier<List<String>> taskList,
final StorageConnector storageConnector, final StorageConnector storageConnector,
final Closer closer final Closer closer
) )
@ -75,28 +75,35 @@ public class DurableStorageInputChannelFactory implements InputChannelFactory
final ExecutorService remoteInputStreamPool = final ExecutorService remoteInputStreamPool =
Executors.newCachedThreadPool(Execs.makeThreadFactory(controllerTaskId + "-remote-fetcher-%d")); Executors.newCachedThreadPool(Execs.makeThreadFactory(controllerTaskId + "-remote-fetcher-%d"));
closer.register(remoteInputStreamPool::shutdownNow); closer.register(remoteInputStreamPool::shutdownNow);
return new DurableStorageInputChannelFactory(controllerTaskId, taskList, storageConnector, remoteInputStreamPool); return new DurableStorageInputChannelFactory(controllerTaskId, storageConnector, remoteInputStreamPool);
} }
@Override @Override
public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int partitionNumber) throws IOException public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int partitionNumber) throws IOException
{ {
final String workerTaskId = taskList.get().get(workerNumber);
try { try {
final String remotePartitionPath = DurableStorageOutputChannelFactory.getPartitionFileName( final String remotePartitionPath = findSuccessfulPartitionOutput(
controllerTaskId, controllerTaskId,
workerTaskId, workerNumber,
stageId.getStageNumber(), stageId.getStageNumber(),
partitionNumber partitionNumber
); );
LOG.debug(
"Reading output of stage [%d], partition [%d] for worker [%d] from the file at path [%s]",
stageId.getStageNumber(),
partitionNumber,
workerNumber,
remotePartitionPath
);
RetryUtils.retry(() -> { RetryUtils.retry(() -> {
if (!storageConnector.pathExists(remotePartitionPath)) { if (!storageConnector.pathExists(remotePartitionPath)) {
throw new ISE( throw new ISE(
"Could not find remote output of worker task[%s] stage[%d] partition[%d]", "Could not find remote outputs of stage [%d] partition [%d] for worker [%d] at the path [%s]",
workerTaskId,
stageId.getStageNumber(), stageId.getStageNumber(),
partitionNumber partitionNumber,
workerNumber,
remotePartitionPath
); );
} }
return Boolean.TRUE; return Boolean.TRUE;
@ -112,11 +119,64 @@ public class DurableStorageInputChannelFactory implements InputChannelFactory
catch (Exception e) { catch (Exception e) {
throw new IOE( throw new IOE(
e, e,
"Could not find remote output of worker task[%s] stage[%d] partition[%d]", "Encountered error while reading the output of stage [%d], partition [%d] for worker [%d]",
workerTaskId,
stageId.getStageNumber(), stageId.getStageNumber(),
partitionNumber,
workerNumber
);
}
}
/**
* Given an input worker number, stage number and the partition number, this method figures out the exact location
* where the outputs would be present in the durable storage and returns the complete path or throws an exception
* if no such file exists in the durable storage
* More information at {@link DurableStorageOutputChannelFactory#createSuccessFile(String)}
*/
public String findSuccessfulPartitionOutput(
final String controllerTaskId,
final int workerNo,
final int stageNumber,
final int partitionNumber
) throws IOException
{
String successfulFilePath = DurableStorageUtils.getSuccessFilePath(
controllerTaskId,
stageNumber,
workerNo
);
if (!storageConnector.pathExists(successfulFilePath)) {
throw new ISE(
"No file present at the location [%s]. Unable to read the outputs of stage [%d], partition [%d] for the worker [%d]",
successfulFilePath,
stageNumber,
partitionNumber,
workerNo
);
}
String successfulTaskId;
try (InputStream is = storageConnector.read(successfulFilePath)) {
successfulTaskId = IOUtils.toString(is, StandardCharsets.UTF_8);
}
if (successfulTaskId == null) {
throw new ISE("Unable to read the task id from the file: [%s]", successfulFilePath);
}
LOG.debug(
"Reading output of stage [%d], partition [%d] from task id [%s]",
stageNumber,
partitionNumber,
successfulTaskId
);
return DurableStorageUtils.getPartitionOutputsFileNameForPartition(
controllerTaskId,
stageNumber,
workerNo,
successfulTaskId,
partitionNumber partitionNumber
); );
} }
} }
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.msq.shuffle; package org.apache.druid.msq.shuffle;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.ReadableNilFrameChannel; import org.apache.druid.frame.channel.ReadableNilFrameChannel;
import org.apache.druid.frame.channel.WritableFrameFileChannel; import org.apache.druid.frame.channel.WritableFrameFileChannel;
@ -28,31 +27,39 @@ import org.apache.druid.frame.file.FrameFileWriter;
import org.apache.druid.frame.processor.OutputChannel; import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.frame.processor.OutputChannelFactory; import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnector;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
public class DurableStorageOutputChannelFactory implements OutputChannelFactory public class DurableStorageOutputChannelFactory implements OutputChannelFactory
{ {
private static final Logger LOG = new Logger(DurableStorageOutputChannelFactory.class);
private final String controllerTaskId; private final String controllerTaskId;
private final String workerTaskId; private final int workerNumber;
private final int stageNumber; private final int stageNumber;
private final String taskId;
private final int frameSize; private final int frameSize;
private final StorageConnector storageConnector; private final StorageConnector storageConnector;
public DurableStorageOutputChannelFactory( public DurableStorageOutputChannelFactory(
final String controllerTaskId, final String controllerTaskId,
final String workerTaskId, final int workerNumber,
final int stageNumber, final int stageNumber,
final String taskId,
final int frameSize, final int frameSize,
final StorageConnector storageConnector final StorageConnector storageConnector
) )
{ {
this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, "controllerTaskId"); this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, "controllerTaskId");
this.workerTaskId = Preconditions.checkNotNull(workerTaskId, "workerTaskId"); this.workerNumber = workerNumber;
this.stageNumber = stageNumber; this.stageNumber = stageNumber;
this.taskId = taskId;
this.frameSize = frameSize; this.frameSize = frameSize;
this.storageConnector = Preconditions.checkNotNull(storageConnector, "storageConnector"); this.storageConnector = Preconditions.checkNotNull(storageConnector, "storageConnector");
} }
@ -63,16 +70,18 @@ public class DurableStorageOutputChannelFactory implements OutputChannelFactory
*/ */
public static DurableStorageOutputChannelFactory createStandardImplementation( public static DurableStorageOutputChannelFactory createStandardImplementation(
final String controllerTaskId, final String controllerTaskId,
final String workerTaskId, final int workerNumber,
final int stageNumber, final int stageNumber,
final String taskId,
final int frameSize, final int frameSize,
final StorageConnector storageConnector final StorageConnector storageConnector
) )
{ {
return new DurableStorageOutputChannelFactory( return new DurableStorageOutputChannelFactory(
controllerTaskId, controllerTaskId,
workerTaskId, workerNumber,
stageNumber, stageNumber,
taskId,
frameSize, frameSize,
storageConnector storageConnector
); );
@ -81,7 +90,13 @@ public class DurableStorageOutputChannelFactory implements OutputChannelFactory
@Override @Override
public OutputChannel openChannel(int partitionNumber) throws IOException public OutputChannel openChannel(int partitionNumber) throws IOException
{ {
final String fileName = getPartitionFileName(controllerTaskId, workerTaskId, stageNumber, partitionNumber); final String fileName = DurableStorageUtils.getPartitionOutputsFileNameForPartition(
controllerTaskId,
stageNumber,
workerNumber,
taskId,
partitionNumber
);
final WritableFrameFileChannel writableChannel = final WritableFrameFileChannel writableChannel =
new WritableFrameFileChannel( new WritableFrameFileChannel(
FrameFileWriter.open( FrameFileWriter.open(
@ -101,7 +116,13 @@ public class DurableStorageOutputChannelFactory implements OutputChannelFactory
@Override @Override
public OutputChannel openNilChannel(int partitionNumber) public OutputChannel openNilChannel(int partitionNumber)
{ {
final String fileName = getPartitionFileName(controllerTaskId, workerTaskId, stageNumber, partitionNumber); final String fileName = DurableStorageUtils.getPartitionOutputsFileNameForPartition(
controllerTaskId,
stageNumber,
workerNumber,
taskId,
partitionNumber
);
// As tasks dependent on output of this partition will forever block if no file is present in RemoteStorage. Hence, writing a dummy frame. // As tasks dependent on output of this partition will forever block if no file is present in RemoteStorage. Hence, writing a dummy frame.
try { try {
@ -111,32 +132,30 @@ public class DurableStorageOutputChannelFactory implements OutputChannelFactory
catch (IOException e) { catch (IOException e) {
throw new ISE( throw new ISE(
e, e,
"Unable to create empty remote output of workerTask[%s] stage[%d] partition[%d]", "Unable to create empty remote output of stage [%d], partition [%d] for worker [%d]",
workerTaskId,
stageNumber, stageNumber,
partitionNumber partitionNumber,
workerNumber
); );
} }
} }
public static String getControllerDirectory(final String controllerTaskId) /**
* Creates a file with name __success and adds the worker's id which has successfully written its outputs. While reading
* this file can be used to find out the worker which has written its outputs completely.
* Rename operation is not very quick in cloud storage like S3 due to which this alternative
* route has been taken.
* If the success file is already present in the location, then this method is a noop
*/
public void createSuccessFile(String taskId) throws IOException
{ {
return StringUtils.format("controller_%s", IdUtils.validateId("controller task ID", controllerTaskId)); String fileName = DurableStorageUtils.getSuccessFilePath(controllerTaskId, stageNumber, workerNumber);
if (storageConnector.pathExists(fileName)) {
LOG.warn("Path [%s] already exists. Won't attempt to rewrite on top of it.", fileName);
return;
} }
OutputStreamWriter os = new OutputStreamWriter(storageConnector.write(fileName), StandardCharsets.UTF_8);
public static String getPartitionFileName( os.write(taskId); // Add some dummy content in the file
final String controllerTaskId, os.close();
final String workerTaskId,
final int stageNumber,
final int partitionNumber
)
{
return StringUtils.format(
"%s/worker_%s/stage_%d/part_%d",
getControllerDirectory(controllerTaskId),
IdUtils.validateId("worker task ID", workerTaskId),
stageNumber,
partitionNumber
);
} }
} }

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.shuffle;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
/**
* Helper class that fetches the directory and file names corresponding to file location
*/
public class DurableStorageUtils
{
public static final String SUCCESS_MARKER_FILENAME = "__success";
public static String getControllerDirectory(final String controllerTaskId)
{
return StringUtils.format("controller_%s", IdUtils.validateId("controller task ID", controllerTaskId));
}
public static String getSuccessFilePath(
final String controllerTaskId,
final int stageNumber,
final int workerNumber
)
{
String folderName = getWorkerOutputFolderName(
controllerTaskId,
stageNumber,
workerNumber
);
String fileName = StringUtils.format("%s/%s", folderName, SUCCESS_MARKER_FILENAME);
return fileName;
}
/**
* Fetches the directory location where workers will store the partition files corresponding to the stage number
*/
public static String getWorkerOutputFolderName(
final String controllerTaskId,
final int stageNumber,
final int workerNumber
)
{
return StringUtils.format(
"%s/stage_%d/worker_%d",
getControllerDirectory(controllerTaskId),
stageNumber,
workerNumber
);
}
/**
* Fetches the directory location where a particular worker will store the partition files corresponding to the
* stage number, and it's task id
*/
public static String getTaskIdOutputsFolderName(
final String controllerTaskId,
final int stageNumber,
final int workerNumber,
final String taskId
)
{
return StringUtils.format(
"%s/taskId_%s",
getWorkerOutputFolderName(controllerTaskId, stageNumber, workerNumber),
taskId
);
}
/**
* Fetches the file location where a particular worker writes the data corresponding to a particular stage
* and partition
*/
public static String getPartitionOutputsFileNameForPartition(
final String controllerTaskId,
final int stageNumber,
final int workerNumber,
final String taskId,
final int partitionNumber
)
{
return StringUtils.format(
"%s/part_%d",
getTaskIdOutputsFolderName(controllerTaskId, stageNumber, workerNumber, taskId),
partitionNumber
);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.msq.indexing.ColumnMapping;
import org.apache.druid.msq.indexing.ColumnMappings; import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryDataSource;
@ -62,6 +63,8 @@ import org.apache.druid.sql.calcite.util.CalciteTests;
import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.junit.Test; import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.File; import java.io.File;
@ -1203,6 +1206,54 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults(); .verifyResults();
} }
@Test
public void testGroupByOnFooWithDurableStoragePathAssertions() throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("cnt1", ColumnType.LONG)
.build();
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo group by cnt")
.setExpectedMSQSpec(MSQSpec.builder()
.query(GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration
.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(dimensions(
new DefaultDimensionSpec(
"cnt",
"d0",
ColumnType.LONG
)
))
.setAggregatorSpecs(aggregators(new CountAggregatorFactory(
"a0")))
.setContext(DEFAULT_MSQ_CONTEXT)
.build())
.columnMappings(
new ColumnMappings(ImmutableList.of(
new ColumnMapping("d0", "cnt"),
new ColumnMapping("a0", "cnt1")
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(new Object[]{1L, 6L}))
.verifyResults();
File successFile = new File(
localFileStorageDir,
DurableStorageUtils.getSuccessFilePath("query-test-query", 0, 0)
);
Mockito.verify(localFileStorageConnector, Mockito.times(2))
.write(ArgumentMatchers.endsWith("__success"));
}
@Nonnull @Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup() private List<Object[]> expectedMultiValueFooRowsGroup()
{ {

View File

@ -43,7 +43,6 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
@ -175,7 +174,6 @@ public class MSQTaskReportTest
} }
@Test @Test
@Ignore("requires https://github.com/apache/druid/pull/12938")
public void testWriteTaskReport() throws Exception public void testWriteTaskReport() throws Exception
{ {
final MSQTaskReport report = new MSQTaskReport( final MSQTaskReport report = new MSQTaskReport(
@ -204,7 +202,9 @@ public class MSQTaskReportTest
final Map<String, TaskReport> reportMap = mapper.readValue( final Map<String, TaskReport> reportMap = mapper.readValue(
reportFile, reportFile,
new TypeReference<Map<String, TaskReport>>() {} new TypeReference<Map<String, TaskReport>>()
{
}
); );
final MSQTaskReport report2 = (MSQTaskReport) reportMap.get(MSQTaskReport.REPORT_KEY); final MSQTaskReport report2 = (MSQTaskReport) reportMap.get(MSQTaskReport.REPORT_KEY);

View File

@ -140,7 +140,7 @@ import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.apache.druid.sql.calcite.view.InProcessViewManager; import org.apache.druid.sql.calcite.view.InProcessViewManager;
import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.local.LocalFileStorageConnectorProvider; import org.apache.druid.storage.local.LocalFileStorageConnector;
import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.PruneLoadSpec; import org.apache.druid.timeline.PruneLoadSpec;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
@ -218,6 +218,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
public final boolean useDefault = NullHandling.replaceWithDefault(); public final boolean useDefault = NullHandling.replaceWithDefault();
protected File localFileStorageDir; protected File localFileStorageDir;
protected LocalFileStorageConnector localFileStorageConnector;
private static final Logger log = new Logger(MSQTestBase.class); private static final Logger log = new Logger(MSQTestBase.class);
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
private MSQTestOverlordServiceClient indexingServiceClient; private MSQTestOverlordServiceClient indexingServiceClient;
@ -327,8 +328,11 @@ public class MSQTestBase extends BaseCalciteQueryTest
MultiStageQuery.class MultiStageQuery.class
); );
localFileStorageDir = tmpFolder.newFolder("fault"); localFileStorageDir = tmpFolder.newFolder("fault");
localFileStorageConnector = Mockito.spy(
new LocalFileStorageConnector(localFileStorageDir)
);
binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
.toProvider(new LocalFileStorageConnectorProvider(localFileStorageDir)); .toProvider(() -> localFileStorageConnector);
} }
catch (IOException e) { catch (IOException e) {
throw new ISE(e, "Unable to create setup storage connector"); throw new ISE(e, "Unable to create setup storage connector");

View File

@ -75,7 +75,7 @@ public class MSQTestControllerClient implements ControllerClient
} }
@Override @Override
public void postWorkerWarning(String workerId, List<MSQErrorReport> MSQErrorReports) public void postWorkerWarning(List<MSQErrorReport> MSQErrorReports)
{ {
controller.workerWarning(MSQErrorReports); controller.workerWarning(MSQErrorReports);
} }