mirror of https://github.com/apache/druid.git
made thread final
This commit is contained in:
parent
3250c698bb
commit
b3157c2752
|
@ -43,12 +43,42 @@ public class InputSupplierUpdateStream implements UpdateStream
|
||||||
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
private final BlockingQueue<Map<String, Object>> queue = new ArrayBlockingQueue<Map<String, Object>>(QUEUE_SIZE);
|
||||||
private final ObjectMapper mapper = new DefaultObjectMapper();
|
private final ObjectMapper mapper = new DefaultObjectMapper();
|
||||||
private final String timeDimension;
|
private final String timeDimension;
|
||||||
private StoppableThread addToQueueThread;
|
private final StoppableThread addToQueueThread;
|
||||||
|
|
||||||
public InputSupplierUpdateStream(
|
public InputSupplierUpdateStream(
|
||||||
InputSupplier<BufferedReader> supplier,
|
final InputSupplier<BufferedReader> supplier,
|
||||||
String timeDimension
|
final String timeDimension
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
addToQueueThread = new StoppableThread()
|
||||||
|
{
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
while (!finished) {
|
||||||
|
try {
|
||||||
|
BufferedReader reader = supplier.getInput();
|
||||||
|
String line;
|
||||||
|
while ((line = reader.readLine()) != null) {
|
||||||
|
if (isValid(line)) {
|
||||||
|
HashMap<String, Object> map = mapper.readValue(line, typeRef);
|
||||||
|
if (map.get(timeDimension) != null) {
|
||||||
|
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
|
||||||
|
log.debug("Successfully added to queue");
|
||||||
|
} else {
|
||||||
|
log.error("missing timestamp");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
catch (Exception e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
addToQueueThread.setDaemon(true);
|
||||||
|
|
||||||
this.supplier = supplier;
|
this.supplier = supplier;
|
||||||
this.typeRef = new TypeReference<HashMap<String, Object>>()
|
this.typeRef = new TypeReference<HashMap<String, Object>>()
|
||||||
{
|
{
|
||||||
|
@ -63,35 +93,12 @@ public class InputSupplierUpdateStream implements UpdateStream
|
||||||
|
|
||||||
public void start()
|
public void start()
|
||||||
{
|
{
|
||||||
addToQueueThread = new StoppableThread(){
|
|
||||||
public void run(){
|
|
||||||
while(!finished)
|
|
||||||
try {
|
|
||||||
BufferedReader reader = supplier.getInput();
|
|
||||||
String line;
|
|
||||||
while ((line = reader.readLine()) != null) {
|
|
||||||
if (isValid(line)) {
|
|
||||||
HashMap<String, Object> map = mapper.readValue(line, typeRef);
|
|
||||||
if (map.get(timeDimension) != null) {
|
|
||||||
queue.offer(map, queueWaitTime, TimeUnit.SECONDS);
|
|
||||||
log.debug("Successfully added to queue");
|
|
||||||
} else {
|
|
||||||
log.error("missing timestamp");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw Throwables.propagate(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
addToQueueThread.setDaemon(true);
|
|
||||||
addToQueueThread.start();
|
addToQueueThread.start();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop(){
|
public void stop()
|
||||||
|
{
|
||||||
addToQueueThread.stopMe();
|
addToQueueThread.stopMe();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +113,8 @@ public class InputSupplierUpdateStream implements UpdateStream
|
||||||
return queue.size();
|
return queue.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getTimeDimension(){
|
public String getTimeDimension()
|
||||||
|
{
|
||||||
return timeDimension;
|
return timeDimension;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue