mirror of https://github.com/apache/druid.git
Remove Query ID verification check from MSQ workers (#16886)
Upgrade/Downgrade between any version till or before Druid 30 where the newer version runs a worker task, while the older version runs a controller task can fail. The patch removes that verification check till its safe to add it back.
This commit is contained in:
parent
acadc2df20
commit
204533cade
|
@ -1100,7 +1100,7 @@ public class WorkerImpl implements Worker
|
||||||
*/
|
*/
|
||||||
public void addKernel(final WorkerStageKernel kernel)
|
public void addKernel(final WorkerStageKernel kernel)
|
||||||
{
|
{
|
||||||
final StageId stageId = verifyQueryId(kernel.getWorkOrder().getStageDefinition().getId());
|
final StageId stageId = kernel.getWorkOrder().getStageDefinition().getId();
|
||||||
|
|
||||||
if (holderMap.putIfAbsent(stageId.getStageNumber(), new KernelHolder(kernel)) != null) {
|
if (holderMap.putIfAbsent(stageId.getStageNumber(), new KernelHolder(kernel)) != null) {
|
||||||
// Already added. Do nothing.
|
// Already added. Do nothing.
|
||||||
|
@ -1116,7 +1116,7 @@ public class WorkerImpl implements Worker
|
||||||
*/
|
*/
|
||||||
public void finishProcessing(final StageId stageId)
|
public void finishProcessing(final StageId stageId)
|
||||||
{
|
{
|
||||||
final KernelHolder kernel = holderMap.get(verifyQueryId(stageId).getStageNumber());
|
final KernelHolder kernel = holderMap.get(stageId.getStageNumber());
|
||||||
|
|
||||||
if (kernel != null) {
|
if (kernel != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -1137,7 +1137,7 @@ public class WorkerImpl implements Worker
|
||||||
*/
|
*/
|
||||||
public void removeKernel(final StageId stageId)
|
public void removeKernel(final StageId stageId)
|
||||||
{
|
{
|
||||||
final KernelHolder removed = holderMap.remove(verifyQueryId(stageId).getStageNumber());
|
final KernelHolder removed = holderMap.remove(stageId.getStageNumber());
|
||||||
|
|
||||||
if (removed == null) {
|
if (removed == null) {
|
||||||
throw new ISE("No kernel for stage[%s]", stageId);
|
throw new ISE("No kernel for stage[%s]", stageId);
|
||||||
|
@ -1191,7 +1191,7 @@ public class WorkerImpl implements Worker
|
||||||
@Nullable
|
@Nullable
|
||||||
public WorkerStageKernel getKernelFor(final StageId stageId)
|
public WorkerStageKernel getKernelFor(final StageId stageId)
|
||||||
{
|
{
|
||||||
final KernelHolder holder = holderMap.get(verifyQueryId(stageId).getStageNumber());
|
final KernelHolder holder = holderMap.get(stageId.getStageNumber());
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
return holder.kernel;
|
return holder.kernel;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1240,15 +1240,6 @@ public class WorkerImpl implements Worker
|
||||||
{
|
{
|
||||||
this.done = true;
|
this.done = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private StageId verifyQueryId(final StageId stageId)
|
|
||||||
{
|
|
||||||
if (!stageId.getQueryId().equals(workerContext.queryId())) {
|
|
||||||
throw new ISE("Unexpected queryId[%s], expected queryId[%s]", stageId.getQueryId(), workerContext.queryId());
|
|
||||||
}
|
|
||||||
|
|
||||||
return stageId;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -31,6 +31,9 @@ import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Globally unique stage identifier: query ID plus stage number.
|
* Globally unique stage identifier: query ID plus stage number.
|
||||||
|
*
|
||||||
|
* Note: Versions till Druid 30 had a bug in the QueryKits which populated the {@link #queryId} field with random
|
||||||
|
* UUIDs. Therefore, all usage of the field must be vetted instead of assuming that it will be the expected query id
|
||||||
*/
|
*/
|
||||||
public class StageId implements Comparable<StageId>
|
public class StageId implements Comparable<StageId>
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue