diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 2be8a6a6a3b..7fe20ef9eab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -79,6 +79,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private final DruidNode node; private final ListeningExecutorService exec; private final ObjectMapper jsonMapper; + private final PortFinder portFinder; private final Map tasks = Maps.newHashMap(); @@ -97,6 +98,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer this.taskLogPusher = taskLogPusher; this.jsonMapper = jsonMapper; this.node = node; + this.portFinder = new PortFinder(config.getStartPort()); this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workerConfig.getCapacity())); } @@ -121,7 +123,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer final File attemptDir = new File(taskDir, attemptUUID); final ProcessHolder processHolder; - + final int childPort = portFinder.findUnusedPort(); try { final Closer closer = Closer.create(); try { @@ -154,7 +156,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer } final List command = Lists.newArrayList(); - final int childPort = findUnusedPort(); final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort); command.add(config.getJavaCommand()); @@ -258,7 +259,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer taskWorkItem.processHolder.process.destroy(); } } - + portFinder.markPortUnused(childPort); log.info("Removing temporary directory: %s", attemptDir); FileUtils.deleteDirectory(attemptDir); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java new file mode 100644 index 00000000000..7fbe7078478 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/PortFinder.java @@ -0,0 +1,94 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexing.overlord; + +import com.google.common.collect.Sets; +import com.metamx.common.ISE; + +import java.io.IOException; +import java.net.BindException; +import java.net.ServerSocket; +import java.util.Set; + +public class PortFinder +{ + private final Set usedPorts = Sets.newHashSet(); + private final int startPort; + + public PortFinder(int startPort) + { + this.startPort = startPort; + } + + private static boolean canBind(int portNum) + { + ServerSocket ss = null; + boolean isFree = false; + try { + ss = new ServerSocket(portNum); + isFree = true; + } + catch (BindException be) { + isFree = false; // port in use, + } + catch (IOException e) { + throw new RuntimeException(e); + } + finally { + if (ss != null) { + while (!ss.isClosed()) { + try { + ss.close(); + } + catch (IOException e) { + // ignore + } + } + } + } + return isFree; + } + + public synchronized int findUnusedPort() + { + int port = chooseNext(startPort); + while (!canBind(port)) { + port = chooseNext(port + 1); + } + usedPorts.add(port); + return port; + } + + public synchronized void markPortUnused(int port) + { + usedPorts.remove(port); + } + + private int chooseNext(int start) + { + for (int i = start; i < Integer.MAX_VALUE; i++) { + if (!usedPorts.contains(i)) { + return i; + } + } + throw new ISE("All ports are Used.."); + } +} + diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java new file mode 100644 index 00000000000..43fb72f05e4 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/PortFinderTest.java @@ -0,0 +1,34 @@ +package io.druid.indexing.overlord; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.ServerSocket; + +public class PortFinderTest +{ + private final PortFinder finder = new PortFinder(1200); + + @Test + public void testUsedPort() throws IOException + { + final int port1 = finder.findUnusedPort(); + // verify that the port is free + ServerSocket socket1 = new ServerSocket(port1); + finder.markPortUnused(port1); + final int port2 = finder.findUnusedPort(); + Assert.assertNotEquals("Used port is not reallocated", port1, port2); + // verify that port2 is free + ServerSocket socket2 = new ServerSocket(port2); + + socket1.close(); + // Now port1 should get recycled + Assert.assertEquals(port1, finder.findUnusedPort()); + + socket2.close(); + finder.markPortUnused(port1); + finder.markPortUnused(port2); + + } +}