Set task location as k8sPodName for mm-less ingestion (#14959)

* Set task location as k8sPodName for mm-less ingestion

* tests
This commit is contained in:
Suneet Saldanha 2023-09-11 19:44:26 -07:00 committed by GitHub
parent f773d83914
commit 757603a773
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 17 deletions

View File

@ -20,8 +20,10 @@
package org.apache.druid.indexer; package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.net.HostAndPort;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -102,6 +104,25 @@ public class TaskLocation
return k8sPodName; return k8sPodName;
} }
@JsonIgnore
@Nullable
public String getLocation()
{
if (k8sPodName != null) {
return k8sPodName;
} else if (host == null) {
return null;
} else {
final int thePort;
if (tlsPort >= 0) {
thePort = tlsPort;
} else {
thePort = port;
}
return HostAndPort.fromParts(host, thePort).toString();
}
}
public URL makeURL(final String encodedPathAndQueryString) throws MalformedURLException public URL makeURL(final String encodedPathAndQueryString) throws MalformedURLException
{ {
final String scheme; final String scheme;

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexer; package org.apache.druid.indexer;
import com.google.common.net.HostAndPort;
import nl.jqno.equalsverifier.EqualsVerifier; import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -75,4 +76,46 @@ public class TaskLocationTest
{ {
EqualsVerifier.forClass(TaskLocation.class).usingGetClass().verify(); EqualsVerifier.forClass(TaskLocation.class).usingGetClass().verify();
} }
@Test
public void testGetLocationWithK8sPodNameShouldReturnK8sPodName()
{
TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, false, "job-name");
Assert.assertEquals("job-name", taskLocation.getLocation());
}
@Test
public void testGetLocationWithK8sPodNameAndTlsShouldReturnK8sPodName()
{
TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, true, "job-name");
Assert.assertEquals("job-name", taskLocation.getLocation());
}
@Test
public void testGetLocationWithK8sPodNameAndNoHostShouldReturnK8sPodName()
{
TaskLocation taskLocation = TaskLocation.create(null, 1, 2, true, "job-name");
Assert.assertEquals("job-name", taskLocation.getLocation());
}
@Test
public void testGetLocationWithoutK8sPodNameAndHostShouldReturnNull()
{
TaskLocation taskLocation = TaskLocation.create(null, 1, 2, false);
Assert.assertNull(taskLocation.getLocation());
}
@Test
public void testGetLocationWithoutK8sPodNameAndNoTlsPortShouldReturnLocation()
{
TaskLocation taskLocation = TaskLocation.create("foo", 1, -1, false);
Assert.assertEquals(HostAndPort.fromParts("foo", 1).toString(), taskLocation.getLocation());
}
@Test
public void testGetLocationWithoutK8sPodNameAndNonZeroTlsPortShouldReturnLocation()
{
TaskLocation taskLocation = TaskLocation.create("foo", 1, 2, true);
Assert.assertEquals(HostAndPort.fromParts("foo", 2).toString(), taskLocation.getLocation());
}
} }

View File

@ -30,7 +30,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.calcite.DataContext; import org.apache.calcite.DataContext;
@ -822,21 +821,7 @@ public class SystemSchema extends AbstractSchema
public Object[] current() public Object[] current()
{ {
final TaskStatusPlus task = it.next(); final TaskStatusPlus task = it.next();
@Nullable final String host = task.getLocation().getHost();
@Nullable final String hostAndPort;
if (host == null) {
hostAndPort = null;
} else {
final int port;
if (task.getLocation().getTlsPort() >= 0) {
port = task.getLocation().getTlsPort();
} else {
port = task.getLocation().getPort();
}
hostAndPort = HostAndPort.fromParts(host, port).toString();
}
return new Object[]{ return new Object[]{
task.getId(), task.getId(),
task.getGroupId(), task.getGroupId(),
@ -847,8 +832,8 @@ public class SystemSchema extends AbstractSchema
toStringOrNull(task.getStatusCode()), toStringOrNull(task.getStatusCode()),
toStringOrNull(task.getRunnerStatusCode()), toStringOrNull(task.getRunnerStatusCode()),
task.getDuration() == null ? 0L : task.getDuration(), task.getDuration() == null ? 0L : task.getDuration(),
hostAndPort, task.getLocation().getLocation(),
host, task.getLocation().getHost(),
(long) task.getLocation().getPort(), (long) task.getLocation().getPort(),
(long) task.getLocation().getTlsPort(), (long) task.getLocation().getTlsPort(),
task.getErrorMsg() task.getErrorMsg()