mirror of https://github.com/apache/druid.git
fix bug where the curator config name was changed in one place but not another; make some info msgs into debug msgs; fix zkworker serialization
This commit is contained in:
parent
a95d9c46e2
commit
626cf14a6e
|
@ -685,7 +685,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
ZkWorker zkWorker, ZkWorker zkWorker2
|
ZkWorker zkWorker, ZkWorker zkWorker2
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return -Ints.compare(zkWorker.getCurrCapacity(), zkWorker2.getCurrCapacity());
|
return -Ints.compare(zkWorker.getCurrCapacityUsed(), zkWorker2.getCurrCapacityUsed());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class ZkWorker implements Closeable
|
||||||
return worker;
|
return worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty("runningTasks")
|
||||||
public Map<String, TaskStatus> getRunningTasks()
|
public Map<String, TaskStatus> getRunningTasks()
|
||||||
{
|
{
|
||||||
Map<String, TaskStatus> retVal = Maps.newHashMap();
|
Map<String, TaskStatus> retVal = Maps.newHashMap();
|
||||||
|
@ -99,8 +99,8 @@ public class ZkWorker implements Closeable
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty("currCapacity")
|
@JsonProperty("currCapacityUsed")
|
||||||
public int getCurrCapacity()
|
public int getCurrCapacityUsed()
|
||||||
{
|
{
|
||||||
int currCapacity = 0;
|
int currCapacity = 0;
|
||||||
for (TaskStatus taskStatus : getRunningTasks().values()) {
|
for (TaskStatus taskStatus : getRunningTasks().values()) {
|
||||||
|
@ -132,12 +132,12 @@ public class ZkWorker implements Closeable
|
||||||
|
|
||||||
public boolean isAtCapacity()
|
public boolean isAtCapacity()
|
||||||
{
|
{
|
||||||
return getCurrCapacity() >= worker.getCapacity();
|
return getCurrCapacityUsed() >= worker.getCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean canRunTask(Task task)
|
public boolean canRunTask(Task task)
|
||||||
{
|
{
|
||||||
return (worker.getCapacity() - getCurrCapacity() >= task.getTaskResource().getRequiredCapacity()
|
return (worker.getCapacity() - getCurrCapacityUsed() >= task.getTaskResource().getRequiredCapacity()
|
||||||
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
|
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
|
||||||
@Default("PT5M")
|
@Default("PT5M")
|
||||||
public abstract Duration getTaskAssignmentTimeoutDuration();
|
public abstract Duration getTaskAssignmentTimeoutDuration();
|
||||||
|
|
||||||
@Config("druid.curator.compression.enable")
|
@Config("druid.curator.compress")
|
||||||
@Default("false")
|
@Default("false")
|
||||||
public abstract boolean enableCompression();
|
public abstract boolean enableCompression();
|
||||||
|
|
||||||
|
|
|
@ -218,7 +218,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
log.info("Performing lookup: %s --> %s", ips, retVal);
|
log.debug("Performing lookup: %s --> %s", ips, retVal);
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
@ -250,7 +250,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
log.info("Performing lookup: %s --> %s", nodeIds, retVal);
|
log.debug("Performing lookup: %s --> %s", nodeIds, retVal);
|
||||||
|
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
|
||||||
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
|
public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
|
||||||
{
|
{
|
||||||
if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) {
|
if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) {
|
||||||
log.info(
|
log.debug(
|
||||||
"Cannot scale anymore. Num workers = %d, Max num workers = %d",
|
"Cannot scale anymore. Num workers = %d, Max num workers = %d",
|
||||||
zkWorkers.size(),
|
zkWorkers.size(),
|
||||||
workerSetupdDataRef.get().getMaxNumWorkers()
|
workerSetupdDataRef.get().getMaxNumWorkers()
|
||||||
|
|
Loading…
Reference in New Issue