Remove unused WorkerManagerClient interface. (#17073)

This commit is contained in:
Gian Merlino 2024-09-16 05:30:47 -07:00 committed by GitHub
parent 8630974157
commit 4d8015578d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 0 additions and 266 deletions

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import java.io.Closeable;
import java.util.Map;
import java.util.Set;
/**
* Generic interface to the "worker manager" mechanism which starts, cancels and monitors worker tasks.
*/
public interface WorkerManagerClient extends Closeable
{
String run(String taskId, MSQWorkerTask task);
/**
* @param workerId the task ID
*
* @return a {@code TaskLocation} associated with the task or
* {@code TaskLocation.unknown()} if no associated entry could be found
*/
TaskLocation location(String workerId);
/**
* Fetches status map corresponding to a group of task ids
*/
Map<String, TaskStatus> statuses(Set<String> taskIds);
/**
* Cancel the task corresponding to the provided workerId
*/
void cancel(String workerId);
@Override
void close();
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.client;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.msq.exec.WorkerManagerClient;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.rpc.indexing.OverlordClient;
import java.util.Map;
import java.util.Set;
/**
* Worker manager client backed by the Indexer service. Glues together
* three different mechanisms to provide the single multi-stage query interface.
*/
public class IndexerWorkerManagerClient implements WorkerManagerClient
{
private final OverlordClient overlordClient;
private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
public IndexerWorkerManagerClient(final OverlordClient overlordClient)
{
this.overlordClient = overlordClient;
}
@Override
public String run(String taskId, MSQWorkerTask task)
{
FutureUtils.getUnchecked(overlordClient.runTask(taskId, task), true);
return taskId;
}
@Override
public void cancel(String taskId)
{
FutureUtils.getUnchecked(overlordClient.cancelTask(taskId), true);
}
@Override
public Map<String, TaskStatus> statuses(Set<String> taskIds)
{
return FutureUtils.getUnchecked(overlordClient.taskStatuses(taskIds), true);
}
@Override
public TaskLocation location(String workerId)
{
return locationFetcher.getLocation(workerId);
}
@Override
public void close()
{
// Nothing to do. The OverlordServiceClient is closed by the JVM lifecycle.
}
private class TaskLocationFetcher
{
TaskLocation getLocation(String workerId)
{
final TaskStatus taskStatus = FutureUtils.getUnchecked(
overlordClient.taskStatuses(ImmutableSet.of(workerId)),
true
).get(workerId);
if (taskStatus != null
&& !TaskLocation.unknown().equals(taskStatus.getLocation())) {
return taskStatus.getLocation();
}
// Retry with the single status API
final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
overlordClient.taskStatus(workerId),
true
);
if (statusResponse == null || statusResponse.getStatus() == null) {
return TaskLocation.unknown();
} else {
return statusResponse.getStatus().getLocation();
}
}
}
}

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.indexing.client;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.Collections;
public class IndexerWorkerManagerClientTest
{
@Test
public void testGetLocationCallsMultiStatusApiByDefault()
{
final OverlordClient overlordClient = Mockito.mock(OverlordClient.class);
final String taskId = "worker1";
final TaskLocation expectedLocation = new TaskLocation("localhost", 1000, 1100, null);
Mockito.when(overlordClient.taskStatuses(Collections.singleton(taskId))).thenReturn(
Futures.immediateFuture(
Collections.singletonMap(
taskId,
new TaskStatus(taskId, TaskState.RUNNING, 100L, null, expectedLocation)
)
)
);
final IndexerWorkerManagerClient managerClient = new IndexerWorkerManagerClient(overlordClient);
Assert.assertEquals(managerClient.location(taskId), expectedLocation);
Mockito.verify(overlordClient, Mockito.times(1)).taskStatuses(ArgumentMatchers.anySet());
Mockito.verify(overlordClient, Mockito.never()).taskStatus(ArgumentMatchers.anyString());
}
@Test
public void testGetLocationFallsBackToSingleTaskApiIfLocationIsUnknown()
{
final OverlordClient overlordClient = Mockito.mock(OverlordClient.class);
final String taskId = "worker1";
Mockito.when(overlordClient.taskStatuses(Collections.singleton(taskId))).thenReturn(
Futures.immediateFuture(
Collections.singletonMap(
taskId,
new TaskStatus(taskId, TaskState.RUNNING, 100L, null, TaskLocation.unknown())
)
)
);
final TaskLocation expectedLocation = new TaskLocation("localhost", 1000, 1100, null);
final TaskStatusPlus taskStatus = new TaskStatusPlus(
taskId,
null,
null,
DateTimes.nowUtc(),
DateTimes.nowUtc(),
TaskState.RUNNING,
null,
100L,
expectedLocation,
"wiki",
null
);
Mockito.when(overlordClient.taskStatus(taskId)).thenReturn(
Futures.immediateFuture(new TaskStatusResponse(taskId, taskStatus))
);
final IndexerWorkerManagerClient managerClient = new IndexerWorkerManagerClient(overlordClient);
Assert.assertEquals(managerClient.location(taskId), expectedLocation);
Mockito.verify(overlordClient, Mockito.times(1)).taskStatuses(ArgumentMatchers.anySet());
Mockito.verify(overlordClient, Mockito.times(1)).taskStatus(ArgumentMatchers.anyString());
}
}