discovered the cause of broker not being able to find segment issue

This commit is contained in:
fjy 2013-05-26 13:31:18 -06:00
parent e08770695f
commit 113db061a8
3 changed files with 28 additions and 11 deletions

View File

@ -127,6 +127,13 @@ public class DruidServer implements Comparable
public DruidServer addDataSegment(String segmentId, DataSegment segment)
{
synchronized (lock) {
DataSegment shouldNotExist = segments.get(segmentId);
if (shouldNotExist != null) {
log.warn("Asked to add data segment that already exists!? server[%s], segment[%s]", getName(), segmentId);
return this;
}
String dataSourceName = segment.getDataSource();
DruidDataSource dataSource = dataSources.get(dataSourceName);
@ -140,13 +147,6 @@ public class DruidServer implements Comparable
dataSource.addSegment(segmentId, segment);
DataSegment shouldNotExist = segments.get(segmentId);
if (shouldNotExist != null) {
log.warn("Asked to add data segment that already exists!? server[%s], segment[%s]", getName(), segmentId);
return this;
}
segments.put(segmentId, segment);
currSize += segment.getSize();
}

View File

@ -148,7 +148,19 @@ public class ServerInventoryView implements ServerView, InventoryView
@Override
public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
{
log.info("Server[%s] added segment[%s]", container.getName(), inventory);
log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
if (segment != null) {
log.warn(
"Not running callbacks for existing segment[%s] on server[%s]",
inventoryKey,
container.getName()
);
return container;
}
final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
runSegmentCallbacks(
@ -170,7 +182,6 @@ public class ServerInventoryView implements ServerView, InventoryView
{
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
final DruidServer retVal = container.removeDataSegment(inventoryKey);
if (segment == null) {
log.warn(
@ -179,9 +190,11 @@ public class ServerInventoryView implements ServerView, InventoryView
container.getName()
);
return retVal;
return container;
}
final DruidServer retVal = container.removeDataSegment(inventoryKey);
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{

View File

@ -216,7 +216,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
containers.put(containerKey, new ContainerHolder(container, inventoryCache));
log.info("Starting inventory cache for %s", container);
log.info("Starting inventory cache for %s, inventoryPath %s", container, inventoryPath);
inventoryCache.start();
strategy.newContainer(container);
}
@ -233,6 +233,8 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
// This close() call actually calls shutdownNow() on the executor registered with the Cache object, it
// better have its own executor or ignore shutdownNow() calls...
log.info("Closing inventory cache for %s", containerKey);
removed.getCache().clear();
removed.getCache().close();
strategy.deadContainer(removed.getContainer());
@ -265,6 +267,8 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
{
this.containerKey = containerKey;
this.inventoryPath = inventoryPath;
log.info("Created new InventoryCacheListener for %s", inventoryPath);
}
@Override