mirror of https://github.com/apache/nifi.git
NIFI-730: do not interrupt swap thread if drop flowfiles is canceled
This commit is contained in:
parent
b8dbd1018c
commit
2b4999c018
|
@ -29,7 +29,6 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
|
||||||
private volatile QueueSize currentSize;
|
private volatile QueueSize currentSize;
|
||||||
private volatile QueueSize droppedSize = new QueueSize(0, 0L);
|
private volatile QueueSize droppedSize = new QueueSize(0, 0L);
|
||||||
private volatile long lastUpdated = System.currentTimeMillis();
|
private volatile long lastUpdated = System.currentTimeMillis();
|
||||||
private volatile Thread executionThread;
|
|
||||||
private volatile String failureReason;
|
private volatile String failureReason;
|
||||||
|
|
||||||
private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
|
private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
|
||||||
|
@ -101,20 +100,12 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
|
||||||
this.lastUpdated = System.currentTimeMillis();
|
this.lastUpdated = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
void setExecutionThread(final Thread thread) {
|
|
||||||
this.executionThread = thread;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized boolean cancel() {
|
synchronized boolean cancel() {
|
||||||
if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
|
if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.state = DropFlowFileState.CANCELED;
|
this.state = DropFlowFileState.CANCELED;
|
||||||
if (executionThread != null) {
|
|
||||||
executionThread.interrupt();
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1051,7 +1051,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
t.start();
|
t.start();
|
||||||
|
|
||||||
dropRequest.setExecutionThread(t);
|
|
||||||
dropRequestMap.put(requestIdentifier, dropRequest);
|
dropRequestMap.put(requestIdentifier, dropRequest);
|
||||||
|
|
||||||
return dropRequest;
|
return dropRequest;
|
||||||
|
|
Loading…
Reference in New Issue