Changeset 9351 in josm
- Timestamp:
- 2016-01-09T15:41:47+01:00 (9 years ago)
- Location:
- trunk/src/org/openstreetmap/josm
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/org/openstreetmap/josm/data/osm/MultipolygonBuilder.java
r9243 r9351 14 14 import java.util.concurrent.Callable; 15 15 import java.util.concurrent.ExecutionException; 16 import java.util.concurrent.ExecutorService; 16 import java.util.concurrent.ForkJoinPool; 17 import java.util.concurrent.ForkJoinTask; 17 18 import java.util.concurrent.Future; 19 import java.util.concurrent.RecursiveTask; 18 20 19 21 import org.openstreetmap.josm.tools.Geometry; … … 31 33 public class MultipolygonBuilder { 32 34 33 private static final Pair<Integer, ExecutorService>THREAD_POOL =34 Utils.new ThreadPool("multipolygon_creation.numberOfThreads", "multipolygon-builder-%d", Thread.NORM_PRIORITY);35 private static final ForkJoinPool THREAD_POOL = 36 Utils.newForkJoinPool("multipolygon_creation.numberOfThreads", "multipolygon-builder-%d", Thread.NORM_PRIORITY); 35 37 36 38 /** … … 303 305 */ 304 306 private static List<PolygonLevel> findOuterWaysMultiThread(List<JoinedPolygon> boundaryWays) { 305 final List<PolygonLevel> result = new ArrayList<>(); 306 final List<Worker> tasks = new ArrayList<>(); 307 final int bucketsize = Math.max(32, boundaryWays.size()/THREAD_POOL.a/3); 308 final int noBuckets = (boundaryWays.size() + bucketsize - 1) / bucketsize; 309 final boolean singleThread = THREAD_POOL.a == 1 || noBuckets == 1; 310 for (int i = 0; i < noBuckets; i++) { 311 int from = i*bucketsize; 312 int to = Math.min((i+1)*bucketsize, boundaryWays.size()); 313 List<PolygonLevel> target = singleThread ? result : new ArrayList<PolygonLevel>(to - from); 314 tasks.add(new Worker(boundaryWays, from, to, target)); 315 } 316 if (singleThread) { 317 try { 318 for (Worker task : tasks) { 319 if (task.call() == null) { 320 return null; 321 } 322 } 323 } catch (Exception ex) { 324 throw new RuntimeException(ex); 325 } 326 } else if (!tasks.isEmpty()) { 327 try { 328 for (Future<List<PolygonLevel>> future : THREAD_POOL.b.invokeAll(tasks)) { 329 List<PolygonLevel> res = future.get(); 330 if (res == null) { 331 return null; 332 } 333 result.addAll(res); 334 } 335 } catch (InterruptedException | ExecutionException ex) { 336 throw new RuntimeException(ex); 337 } 338 } 339 return result; 340 } 341 342 private static class Worker implements Callable<List<PolygonLevel>> { 307 return THREAD_POOL.invoke(new Worker(boundaryWays, 0, boundaryWays.size(), new ArrayList<PolygonLevel>(), 308 Math.max(32, boundaryWays.size() / THREAD_POOL.getParallelism() / 3))); 309 } 310 311 private static class Worker extends RecursiveTask<List<PolygonLevel>> { 343 312 344 313 private final List<JoinedPolygon> input; … … 346 315 private final int to; 347 316 private final List<PolygonLevel> output; 348 349 Worker(List<JoinedPolygon> input, int from, int to, List<PolygonLevel> output) { 317 private final int directExecutionTaskSize; 318 319 Worker(List<JoinedPolygon> input, int from, int to, List<PolygonLevel> output, int directExecutionTaskSize) { 350 320 this.input = input; 351 321 this.from = from; 352 322 this.to = to; 353 323 this.output = output; 324 this.directExecutionTaskSize = directExecutionTaskSize; 354 325 } 355 326 … … 407 378 408 379 @Override 409 public List<PolygonLevel> call() throws Exception { 380 protected List<PolygonLevel> compute() { 381 if (to - from < directExecutionTaskSize) { 382 return computeDirectly(); 383 } else { 384 final Collection<ForkJoinTask<List<PolygonLevel>>> tasks = new ArrayList<>(); 385 for (int fromIndex = from; fromIndex < to; fromIndex += directExecutionTaskSize) { 386 final List<PolygonLevel> output = new ArrayList<>(); 387 tasks.add(new Worker(input, fromIndex, Math.min(fromIndex + directExecutionTaskSize, to), output, directExecutionTaskSize)); 388 } 389 for (ForkJoinTask<List<PolygonLevel>> task : tasks) { 390 output.addAll(task.join()); 391 } 392 return output; 393 } 394 } 395 396 List<PolygonLevel> computeDirectly() { 410 397 for (int i = from; i < to; i++) { 411 398 if (processOuterWay(0, input, output, input.get(i)) == null) { -
trunk/src/org/openstreetmap/josm/data/osm/visitor/paint/StyledMapRenderer.java
r9284 r9351 33 33 import java.util.List; 34 34 import java.util.Map; 35 import java.util.concurrent.Callable; 36 import java.util.concurrent.ExecutionException; 37 import java.util.concurrent.ExecutorService; 38 import java.util.concurrent.Future; 35 import java.util.concurrent.ForkJoinPool; 36 import java.util.concurrent.ForkJoinTask; 37 import java.util.concurrent.RecursiveTask; 39 38 40 39 import javax.swing.AbstractButton; … … 78 77 import org.openstreetmap.josm.tools.Geometry.AreaAndPerimeter; 79 78 import org.openstreetmap.josm.tools.ImageProvider; 80 import org.openstreetmap.josm.tools.Pair;81 79 import org.openstreetmap.josm.tools.Utils; 82 80 … … 87 85 public class StyledMapRenderer extends AbstractMapRenderer { 88 86 89 private static final Pair<Integer, ExecutorService>THREAD_POOL =90 Utils.new ThreadPool("mappaint.StyledMapRenderer.style_creation.numberOfThreads", "styled-map-renderer-%d", Thread.NORM_PRIORITY);87 private static final ForkJoinPool THREAD_POOL = 88 Utils.newForkJoinPool("mappaint.StyledMapRenderer.style_creation.numberOfThreads", "styled-map-renderer-%d", Thread.NORM_PRIORITY); 91 89 92 90 /** … … 1761 1759 } 1762 1760 1763 private class ComputeStyleListWorker implements Callable<List<StyleRecord>>,Visitor {1761 private class ComputeStyleListWorker extends RecursiveTask<List<StyleRecord>> implements Visitor { 1764 1762 private final List<? extends OsmPrimitive> input; 1765 private final int from;1766 private final int to;1767 1763 private final List<StyleRecord> output; 1768 1764 1769 1765 private final ElemStyles styles = MapPaintStyles.getStyles(); 1766 private final int directExecutionTaskSize; 1770 1767 1771 1768 private final boolean drawArea = circum <= Main.pref.getInteger("mappaint.fillareas", 10000000); … … 1776 1773 * Constructs a new {@code ComputeStyleListWorker}. 1777 1774 * @param input the primitives to process 1778 * @param from first index of <code>input</code> to use1779 * @param to last index + 11780 1775 * @param output the list of styles to which styles will be added 1776 * @param directExecutionTaskSize the threshold deciding whether to subdivide the tasks 1781 1777 */ 1782 ComputeStyleListWorker(final List<? extends OsmPrimitive> input, int from, int to, List<StyleRecord> output) {1778 ComputeStyleListWorker(final List<? extends OsmPrimitive> input, List<StyleRecord> output, int directExecutionTaskSize) { 1783 1779 this.input = input; 1784 this.from = from;1785 this.to = to;1786 1780 this.output = output; 1781 this.directExecutionTaskSize = directExecutionTaskSize; 1787 1782 this.styles.setDrawMultipolygon(drawMultipolygon); 1788 1783 } 1789 1784 1790 1785 @Override 1791 public List<StyleRecord> call() throws Exception { 1786 protected List<StyleRecord> compute() { 1787 if (input.size() <= directExecutionTaskSize) { 1788 return computeDirectly(); 1789 } else { 1790 final Collection<ForkJoinTask<List<StyleRecord>>> tasks = new ArrayList<>(); 1791 for (int fromIndex = 0; fromIndex < input.size(); fromIndex += directExecutionTaskSize) { 1792 final int toIndex = Math.min(fromIndex + directExecutionTaskSize, input.size()); 1793 final List<StyleRecord> output = new ArrayList<>(directExecutionTaskSize); 1794 tasks.add(new ComputeStyleListWorker(input.subList(fromIndex, toIndex), output, directExecutionTaskSize).fork()); 1795 } 1796 for (ForkJoinTask<List<StyleRecord>> task : tasks) { 1797 output.addAll(task.join()); 1798 } 1799 return output; 1800 } 1801 } 1802 1803 public List<StyleRecord> computeDirectly() { 1792 1804 MapCSSStyleSource.STYLE_SOURCE_LOCK.readLock().lock(); 1793 1805 try { 1794 for (int i = from; i < to; i++) { 1795 OsmPrimitive osm = input.get(i); 1806 for (final OsmPrimitive osm : input) { 1796 1807 if (osm.isDrawable()) { 1797 1808 osm.accept(this); … … 1853 1864 } 1854 1865 1855 private class ConcurrentTasksHelper {1856 1857 private final List<StyleRecord> allStyleElems;1858 1859 ConcurrentTasksHelper(List<StyleRecord> allStyleElems) {1860 this.allStyleElems = allStyleElems;1861 }1862 1863 void process(List<? extends OsmPrimitive> prims) {1864 final List<ComputeStyleListWorker> tasks = new ArrayList<>();1865 final int bucketsize = Math.max(100, prims.size()/THREAD_POOL.a/3);1866 final int noBuckets = (prims.size() + bucketsize - 1) / bucketsize;1867 final boolean singleThread = THREAD_POOL.a == 1 || noBuckets == 1;1868 for (int i = 0; i < noBuckets; i++) {1869 int from = i*bucketsize;1870 int to = Math.min((i+1)*bucketsize, prims.size());1871 List<StyleRecord> target = singleThread ? allStyleElems : new ArrayList<StyleRecord>(to - from);1872 tasks.add(new ComputeStyleListWorker(prims, from, to, target));1873 }1874 if (singleThread) {1875 try {1876 for (ComputeStyleListWorker task : tasks) {1877 task.call();1878 }1879 } catch (Exception ex) {1880 throw new RuntimeException(ex);1881 }1882 } else if (!tasks.isEmpty()) {1883 try {1884 for (Future<List<StyleRecord>> future : THREAD_POOL.b.invokeAll(tasks)) {1885 allStyleElems.addAll(future.get());1886 }1887 } catch (InterruptedException | ExecutionException ex) {1888 throw new RuntimeException(ex);1889 }1890 }1891 }1892 }1893 1894 1866 @Override 1895 1867 public void render(final DataSet data, boolean renderVirtualNodes, Bounds bounds) { … … 1914 1886 final List<StyleRecord> allStyleElems = new ArrayList<>(nodes.size()+ways.size()+relations.size()); 1915 1887 1916 ConcurrentTasksHelper helper = new ConcurrentTasksHelper(allStyleElems);1917 1918 1888 // Need to process all relations first. 1919 1889 // Reason: Make sure, ElemStyles.getStyleCacheWithRange is … … 1921 1891 // (Could be synchronized, but try to avoid this for 1922 1892 // performance reasons.) 1923 helper.process(relations); 1924 helper.process(new CompositeList<>(nodes, ways)); 1893 THREAD_POOL.invoke(new ComputeStyleListWorker(relations, allStyleElems, 1894 Math.max(20, relations.size() / THREAD_POOL.getParallelism() / 3))); 1895 THREAD_POOL.invoke(new ComputeStyleListWorker(new CompositeList<>(nodes, ways), allStyleElems, 1896 Math.max(100, (nodes.size() + ways.size()) / THREAD_POOL.getParallelism() / 3))); 1925 1897 1926 1898 if (benchmark) { -
trunk/src/org/openstreetmap/josm/tools/Utils.java
r9297 r9351 51 51 import java.util.concurrent.ExecutorService; 52 52 import java.util.concurrent.Executors; 53 import java.util.concurrent.ForkJoinPool; 54 import java.util.concurrent.ForkJoinWorkerThread; 53 55 import java.util.concurrent.ThreadFactory; 54 56 import java.util.concurrent.atomic.AtomicLong; … … 1411 1413 1412 1414 /** 1413 * Returns a pair containing the number of threads (n), and a thread pool (if n > 1) to perform 1414 * multi-thread computation in the context of the given preference key. 1415 * @param pref The preference key 1415 * Returns a {@link ForkJoinPool} with the parallelism given by the preference key. 1416 * @param pref The preference key to determine parallelism 1416 1417 * @param nameFormat see {@link #newThreadFactory(String, int)} 1417 1418 * @param threadPriority see {@link #newThreadFactory(String, int)} 1418 * @return a pair containing the number of threads (n), and a thread pool (if n > 1, null otherwise) 1419 * @since 7423 1420 */ 1421 public static Pair<Integer, ExecutorService> newThreadPool(String pref, String nameFormat, int threadPriority) { 1419 * @return a {@link ForkJoinPool} 1420 */ 1421 public static ForkJoinPool newForkJoinPool(String pref, final String nameFormat, final int threadPriority) { 1422 1422 int noThreads = Main.pref.getInteger(pref, Runtime.getRuntime().availableProcessors()); 1423 ExecutorService pool = noThreads <= 1 ? null : Executors.newFixedThreadPool(noThreads, newThreadFactory(nameFormat, threadPriority)); 1424 return new Pair<>(noThreads, pool); 1423 return new ForkJoinPool(noThreads, new ForkJoinPool.ForkJoinWorkerThreadFactory() { 1424 final AtomicLong count = new AtomicLong(0); 1425 @Override 1426 public ForkJoinWorkerThread newThread(ForkJoinPool pool) { 1427 final ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); 1428 thread.setName(String.format(Locale.ENGLISH, nameFormat, count.getAndIncrement())); 1429 thread.setPriority(threadPriority); 1430 return thread; 1431 } 1432 }, null, true); 1425 1433 } 1426 1434
Note:
See TracChangeset
for help on using the changeset viewer.