Changeset 8314 in josm


Ignore:
Timestamp:
2015-05-02T22:53:56+02:00 (4 years ago)
Author:
bastiK
Message:

applied #10902 - TMS simultaneous connections (2nd patch by wiktorn)

Location:
trunk/src/org/openstreetmap/josm/data
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java

    r8291 r8314  
    5656     * maximum download threads that will be started
    5757     */
    58     public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10);
     58    public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10);
     59
     60    public static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
     61        public LIFOQueue(int capacity) {
     62            super(capacity);
     63        }
     64
     65        @Override
     66        public boolean offer(Runnable t) {
     67            return super.offerFirst(t);
     68        }
     69
     70        @Override
     71        public Runnable remove() {
     72            return super.removeFirst();
     73        }
     74    }
     75
     76
     77    /*
     78     * ThreadPoolExecutor starts new threads, until THREAD_LIMIT is reached. Then it puts tasks into LIFOQueue, which is fairly
     79     * small, but we do not want a lot of outstanding tasks queued, but rather prefer the class consumer to resubmit the task, which are
     80     * important right now.
     81     *
     82     * This way, if some task gets outdated (for example - user paned the map, and we do not want to download this tile any more),
     83     * the task will not be resubmitted, and thus - never queued.
     84     *
     85     * There is no point in canceling tasks, that are already taken by worker threads (if we made so much effort, we can at least cache
     86     * the response, so later it could be used). We could actually cancel what is in LIFOQueue, but this is a tradeoff between simplicity
     87     * and performance (we do want to have something to offer to worker threads before tasks will be resubmitted by class consumer)
     88     */
    5989    private static Executor DOWNLOAD_JOB_DISPATCHER = new ThreadPoolExecutor(
    6090            2, // we have a small queue, so threads will be quickly started (threads are started only, when queue is full)
     
    6393            TimeUnit.SECONDS,
    6494            // make queue of LIFO type - so recently requested tiles will be loaded first (assuming that these are which user is waiting to see)
    65             new LinkedBlockingDeque<Runnable>(5) {
    66                 /* keep the queue size fairly small, we do not want to
    67                  download a lot of tiles, that user is not seeing anyway */
    68                 @Override
    69                 public boolean offer(Runnable t) {
    70                     return super.offerFirst(t);
    71                 }
    72 
    73                 @Override
    74                 public Runnable remove() {
    75                     return super.removeFirst();
    76                 }
    77             }
     95            new LIFOQueue(5)
    7896            );
     97
    7998    private static ConcurrentMap<String,Set<ICachedLoaderListener>> inProgress = new ConcurrentHashMap<>();
    8099    private static ConcurrentMap<String, Boolean> useHead = new ConcurrentHashMap<>();
     
    157176            // object not in cache, so submit work to separate thread
    158177            try {
    159                 // use getter method, so subclasses may override executors, to get separate thread pool
    160                 getDownloadExecutor().execute(JCSCachedTileLoaderJob.this);
     178                if (executionGuard()) {
     179                    // use getter method, so subclasses may override executors, to get separate thread pool
     180                    getDownloadExecutor().execute(this);
     181                } else {
     182                    log.log(Level.FINE, "JCS - guard rejected job for: {0}", getCacheKey());
     183                    finishLoading(LoadResult.REJECTED);
     184                }
    161185            } catch (RejectedExecutionException e) {
    162186                // queue was full, try again later
     
    165189            }
    166190        }
     191    }
     192
     193    /**
     194     * Guard method for execution. If guard returns true, the execution of download task will commence
     195     * otherwise, execution will finish with result LoadResult.REJECTED
     196     *
     197     * It is responsibility of the overriding class, to handle properly situation in finishLoading class
     198     * @return
     199     */
     200    protected boolean executionGuard() {
     201        return true;
     202    }
     203
     204    /**
     205     * This method is run when job has finished
     206     */
     207    protected void executionFinished() {
    167208    }
    168209
     
    219260            }
    220261        } finally {
     262            executionFinished();
    221263            currentThread.setName(oldName);
    222264        }
  • trunk/src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java

    r8307 r8314  
    88import java.util.concurrent.ConcurrentHashMap;
    99import java.util.concurrent.Executor;
    10 import java.util.concurrent.LinkedBlockingDeque;
    1110import java.util.concurrent.Semaphore;
    1211import java.util.concurrent.ThreadPoolExecutor;
     
    4241
    4342    /**
     43     * Limit definition for per host concurrent connections
     44     */
     45    public final static IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
     46
     47     /*
     48     * Host limit guards the area - between submission to the queue up to loading is finished. It uses executionGuard method
     49     * from JCSCachedTileLoaderJob to acquire the semaphore, and releases it - when loadingFinished is called (but not when
     50     * LoadResult.GUARD_REJECTED is set)
     51     *
     52     */
     53
     54    private Semaphore getSemaphore() {
     55        String host = getUrl().getHost();
     56        Semaphore limit = HOST_LIMITS.get(host);
     57        if (limit == null) {
     58            synchronized(HOST_LIMITS) {
     59                limit = HOST_LIMITS.get(host);
     60                if (limit == null) {
     61                    limit = new Semaphore(HOST_LIMIT.get().intValue());
     62                    HOST_LIMITS.put(host, limit);
     63                }
     64            }
     65        }
     66        return limit;
     67    }
     68
     69    private boolean acquireSemaphore() {
     70        boolean ret = true;
     71        Semaphore limit = getSemaphore();
     72        if (limit != null) {
     73            ret = limit.tryAcquire();
     74            if (!ret) {
     75                Main.debug("rejecting job because of per host limit");
     76            }
     77        }
     78        return ret;
     79
     80    }
     81
     82    private void releaseSemaphore() {
     83        Semaphore limit = getSemaphore();
     84        if (limit != null) {
     85            limit.release();
     86        }
     87    }
     88
     89
     90    private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>();
     91
     92    /**
    4493     * overrides the THREAD_LIMIT in superclass, as we want to have separate limit and pool for TMS
    4594     */
    46     public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
    47 
    48     /**
    49      * Limit definition for per host concurrent connections
    50      */
    51     public static final IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
    52 
    53 
    54     private static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
    55         public LIFOQueue(int capacity) {
    56             super(capacity);
    57         }
    58 
    59         private final static Semaphore getSemaphore(Runnable r) {
    60             if (!(r instanceof TMSCachedTileLoaderJob))
    61                 return null;
    62             TMSCachedTileLoaderJob cachedJob = (TMSCachedTileLoaderJob) r;
    63             Semaphore limit = HOST_LIMITS.get(cachedJob.getUrl().getHost());
    64             if (limit == null) {
    65                 synchronized(HOST_LIMITS) {
    66                     limit = HOST_LIMITS.get(cachedJob.getUrl().getHost());
    67                     if (limit == null) {
    68                         limit = new Semaphore(HOST_LIMIT.get().intValue());
    69                         HOST_LIMITS.put(cachedJob.getUrl().getHost(), limit);
    70                     }
    71                 }
    72             }
    73             return limit;
    74         }
    75 
    76         private boolean acquireSemaphore(Runnable r) {
    77             boolean ret = true;
    78             Semaphore limit = getSemaphore(r);
    79             if (limit != null) {
    80                 ret = limit.tryAcquire();
    81                 if (!ret) {
    82                     Main.debug("rejecting job because of per host limit");
    83                 }
    84             }
    85             return ret;
    86         }
    87 
    88         @Override
    89         public boolean offer(Runnable t) {
    90             return acquireSemaphore(t) && super.offerFirst(t);
    91         }
    92 
    93         private Runnable releaseSemaphore(Runnable r) {
    94             Semaphore limit = getSemaphore(r);
    95             if (limit != null)
    96                 limit.release();
    97             return r;
    98         }
    99 
    100         @Override
    101         public Runnable remove() {
    102             return releaseSemaphore(super.removeFirst());
    103         }
    104 
    105         @Override
    106         public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
    107             return releaseSemaphore(super.poll(timeout, unit));
    108         }
    109 
    110         @Override
    111         public Runnable take() throws InterruptedException {
    112             return releaseSemaphore(super.take());
    113         }
    114     }
    115 
    116     private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>();
     95    public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
    11796
    11897    /**
     
    143122    }
    144123
    145 
    146124    /**
    147125     * Constructor for creating a job, to get a specific tile from cache
     
    175153     *  this doesn't needs to be synchronized, as it's not that costly to keep only one execution
    176154     *  in parallel, but URL creation and Tile.getUrl() are costly and are not needed when fetching
    177      *  data from cache
     155     *  data from cache, that's why URL creation is postponed until it's needed
    178156     *
    179157     *  We need to have static url value for TileLoaderJob, as for some TileSources we might get different
     
    234212    }
    235213
     214    @Override
     215    protected boolean executionGuard() {
     216        return acquireSemaphore();
     217    }
     218
     219    @Override
     220    protected void executionFinished() {
     221        releaseSemaphore();
     222    }
     223
    236224    public void submit() {
    237225        tile.initLoading();
     
    246234            case FAILURE:
    247235                tile.setError("Problem loading tile");
     236                // no break intentional here
    248237            case SUCCESS:
    249238                handleNoTileAtZoom();
     
    254243                    }
    255244                }
     245                // no break intentional here
    256246            case REJECTED:
    257                 // do not set anything here, leave waiting sign
     247                // do nothing
    258248            }
    259249            if (listener != null) {
Note: See TracChangeset for help on using the changeset viewer.