mirror of https://github.com/apache/druid.git
Reduce excessive logging (#4680)
* Reduce excessive logging * Refactoring code as per comments
This commit is contained in:
parent
ab28dc3b97
commit
dba7c7d3cd
|
@ -193,99 +193,101 @@ public class LoadQueuePeon
|
|||
|
||||
private void processSegmentChangeRequest()
|
||||
{
|
||||
if (currentlyProcessing == null) {
|
||||
if (!segmentsToDrop.isEmpty()) {
|
||||
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
|
||||
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
} else if (!segmentsToLoad.isEmpty()) {
|
||||
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
|
||||
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
} else {
|
||||
if (currentlyProcessing != null) {
|
||||
log.debug(
|
||||
"Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
|
||||
basePath,
|
||||
currentlyProcessing.getSegmentIdentifier()
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!segmentsToDrop.isEmpty()) {
|
||||
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
|
||||
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
} else if (!segmentsToLoad.isEmpty()) {
|
||||
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
|
||||
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (currentlyProcessing == null) {
|
||||
if (!stopped) {
|
||||
log.makeAlert("Crazy race condition! server[%s]", basePath)
|
||||
.emit();
|
||||
}
|
||||
actionCompleted();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (currentlyProcessing == null) {
|
||||
if (!stopped) {
|
||||
log.makeAlert("Crazy race condition! server[%s]", basePath)
|
||||
.emit();
|
||||
}
|
||||
actionCompleted();
|
||||
return;
|
||||
}
|
||||
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
|
||||
|
||||
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
|
||||
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
|
||||
|
||||
processingExecutor.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
if (curator.checkExists().forPath(path) != null) {
|
||||
failAssign(new ISE("%s was never removed! Failing this operation!", path));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
failAssign(e);
|
||||
}
|
||||
}
|
||||
},
|
||||
config.getLoadTimeoutDelay().getMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
|
||||
final Stat stat = curator.checkExists().usingWatcher(
|
||||
new CuratorWatcher()
|
||||
{
|
||||
@Override
|
||||
public void process(WatchedEvent watchedEvent) throws Exception
|
||||
{
|
||||
switch (watchedEvent.getType()) {
|
||||
case NodeDeleted:
|
||||
entryRemoved(watchedEvent.getPath());
|
||||
break;
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
processingExecutor.schedule(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
if (curator.checkExists().forPath(path) != null) {
|
||||
failAssign(new ISE("%s was never removed! Failing this operation!", path));
|
||||
}
|
||||
).forPath(path);
|
||||
|
||||
if (stat == null) {
|
||||
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
|
||||
|
||||
// Create a node and then delete it to remove the registered watcher. This is a work-around for
|
||||
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
|
||||
// that happens for that node. If no events happen, the watcher stays registered foreverz.
|
||||
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
|
||||
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
|
||||
// set a watcher when you check to see if it exists so, we first create the node and then set a
|
||||
// watcher on its existence. However, if already does not exist by the time the existence check
|
||||
// returns, then the watcher that was set will never fire (nobody will ever create the node
|
||||
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
|
||||
// that watcher to fire and delete it right away.
|
||||
//
|
||||
// We do not create the existence watcher first, because then it will fire when we create the
|
||||
// node and we'll have the same race when trying to refresh that watcher.
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
|
||||
|
||||
entryRemoved(path);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
failAssign(e);
|
||||
}
|
||||
} else {
|
||||
log.info(
|
||||
"Server[%s] skipping doNext() because something is currently loading[%s].",
|
||||
basePath,
|
||||
currentlyProcessing.getSegmentIdentifier()
|
||||
}
|
||||
catch (Exception e) {
|
||||
failAssign(e);
|
||||
}
|
||||
}
|
||||
},
|
||||
config.getLoadTimeoutDelay().getMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
|
||||
final Stat stat = curator.checkExists().usingWatcher(
|
||||
new CuratorWatcher()
|
||||
{
|
||||
@Override
|
||||
public void process(WatchedEvent watchedEvent) throws Exception
|
||||
{
|
||||
switch (watchedEvent.getType()) {
|
||||
case NodeDeleted:
|
||||
entryRemoved(watchedEvent.getPath());
|
||||
break;
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
).forPath(path);
|
||||
|
||||
if (stat == null) {
|
||||
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
|
||||
|
||||
// Create a node and then delete it to remove the registered watcher. This is a work-around for
|
||||
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
|
||||
// that happens for that node. If no events happen, the watcher stays registered foreverz.
|
||||
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
|
||||
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
|
||||
// set a watcher when you check to see if it exists so, we first create the node and then set a
|
||||
// watcher on its existence. However, if already does not exist by the time the existence check
|
||||
// returns, then the watcher that was set will never fire (nobody will ever create the node
|
||||
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
|
||||
// that watcher to fire and delete it right away.
|
||||
//
|
||||
// We do not create the existence watcher first, because then it will fire when we create the
|
||||
// node and we'll have the same race when trying to refresh that watcher.
|
||||
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
|
||||
|
||||
entryRemoved(path);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
failAssign(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue