Ticket #10902: TMS_simultaneous_connections_v02.patch

File TMS_simultaneous_connections_v02.patch, 12.3 KB (added by wiktorn, 9 years ago)
  • src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java

    diff --git src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java
    index a474c78..02fd4ba 100644
    public abstract class JCSCachedTileLoaderJob<K, V extends CacheEntry> implements  
    5555    /**
    5656     * maximum download threads that will be started
    5757     */
    58     public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10);
     58    public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10);
     59
     60    public static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
     61        public LIFOQueue(int capacity) {
     62            super(capacity);
     63        }
     64
     65        @Override
     66        public boolean offer(Runnable t) {
     67            return super.offerFirst(t);
     68        }
     69
     70        @Override
     71        public Runnable remove() {
     72            return super.removeFirst();
     73        }
     74    }
     75
     76
     77    /*
     78     * ThreadPoolExecutor starts new threads, until THREAD_LIMIT is reached. Then it puts tasks into LIFOQueue, which is fairly
     79     * small, but we do not want a lot of outstanding tasks queued, but rather prefer the class consumer to resubmit the task, which are
     80     * important right now.
     81     *
     82     * This way, if some task gets outdated (for example - user paned the map, and we do not want to download this tile any more),
     83     * the task will not be resubmitted, and thus - never queued.
     84     *
     85     * There is no point in canceling tasks, that are already taken by worker threads (if we made so much effort, we can at least cache
     86     * the response, so later it could be used). We could actually cancel what is in LIFOQueue, but this is a tradeoff between simplicity
     87     * and performance (we do want to have something to offer to worker threads before tasks will be resubmitted by class consumer)
     88     */
    5989    private static Executor DOWNLOAD_JOB_DISPATCHER = new ThreadPoolExecutor(
    6090            2, // we have a small queue, so threads will be quickly started (threads are started only, when queue is full)
    6191            THREAD_LIMIT.get().intValue(), // do not this number of threads
    6292            30, // keepalive for thread
    6393            TimeUnit.SECONDS,
    6494            // make queue of LIFO type - so recently requested tiles will be loaded first (assuming that these are which user is waiting to see)
    65             new LinkedBlockingDeque<Runnable>(5) {
    66                 /* keep the queue size fairly small, we do not want to
    67                  download a lot of tiles, that user is not seeing anyway */
    68                 @Override
    69                 public boolean offer(Runnable t) {
    70                     return super.offerFirst(t);
    71                 }
    72 
    73                 @Override
    74                 public Runnable remove() {
    75                     return super.removeFirst();
    76                 }
    77             }
     95            new LIFOQueue(5)
    7896            );
     97
    7998    private static ConcurrentMap<String,Set<ICachedLoaderListener>> inProgress = new ConcurrentHashMap<>();
    8099    private static ConcurrentMap<String, Boolean> useHead = new ConcurrentHashMap<>();
    81100
    public abstract class JCSCachedTileLoaderJob<K, V extends CacheEntry> implements  
    156175            }
    157176            // object not in cache, so submit work to separate thread
    158177            try {
    159                 // use getter method, so subclasses may override executors, to get separate thread pool
    160                 getDownloadExecutor().execute(JCSCachedTileLoaderJob.this);
     178                if (executionGuard()) {
     179                    // use getter method, so subclasses may override executors, to get separate thread pool
     180                    getDownloadExecutor().execute(this);
     181                } else {
     182                    log.log(Level.FINE, "JCS - guard rejected job for: {0}", getCacheKey());
     183                    finishLoading(LoadResult.REJECTED);
     184                }
    161185            } catch (RejectedExecutionException e) {
    162186                // queue was full, try again later
    163187                log.log(Level.FINE, "JCS - rejected job for: {0}", getCacheKey());
    public abstract class JCSCachedTileLoaderJob<K, V extends CacheEntry> implements  
    167191    }
    168192
    169193    /**
     194     * Guard method for execution. If guard returns true, the execution of download task will commence
     195     * otherwise, execution will finish with result LoadResult.REJECTED
     196     *
     197     * It is responsibility of the overriding class, to handle properly situation in finishLoading class
     198     * @return
     199     */
     200    protected boolean executionGuard() {
     201        return true;
     202    }
     203
     204    /**
     205     * This method is run when job has finished
     206     */
     207    protected void executionFinished() {
     208    }
     209
     210    /**
    170211     *
    171212     * @return checks if object from cache has sufficient data to be returned
    172213     */
    public abstract class JCSCachedTileLoaderJob<K, V extends CacheEntry> implements  
    218259                }
    219260            }
    220261        } finally {
     262            executionFinished();
    221263            currentThread.setName(oldName);
    222264        }
    223265    }
  • src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java

    diff --git src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java
    index ad46d1a..22d83fb 100644
    import java.net.URL;  
    77import java.util.Map;
    88import java.util.concurrent.ConcurrentHashMap;
    99import java.util.concurrent.Executor;
    10 import java.util.concurrent.LinkedBlockingDeque;
    1110import java.util.concurrent.Semaphore;
    1211import java.util.concurrent.ThreadPoolExecutor;
    1312import java.util.concurrent.TimeUnit;
    public class TMSCachedTileLoaderJob extends JCSCachedTileLoaderJob<String, Buffe  
    4140    private volatile URL url;
    4241
    4342    /**
    44      * overrides the THREAD_LIMIT in superclass, as we want to have separate limit and pool for TMS
    45      */
    46     public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
    47 
    48     /**
    4943     * Limit definition for per host concurrent connections
    5044     */
    51     public static final IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
     45    public final static IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
    5246
     47     /*
     48     * Host limit guards the area - between submission to the queue up to loading is finished. It uses executionGuard method
     49     * from JCSCachedTileLoaderJob to acquire the semaphore, and releases it - when loadingFinished is called (but not when
     50     * LoadResult.GUARD_REJECTED is set)
     51     *
     52     */
    5353
    54     private static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
    55         public LIFOQueue(int capacity) {
    56             super(capacity);
    57         }
    58 
    59         private final static Semaphore getSemaphore(Runnable r) {
    60             if (!(r instanceof TMSCachedTileLoaderJob))
    61                 return null;
    62             TMSCachedTileLoaderJob cachedJob = (TMSCachedTileLoaderJob) r;
    63             Semaphore limit = HOST_LIMITS.get(cachedJob.getUrl().getHost());
    64             if (limit == null) {
    65                 synchronized(HOST_LIMITS) {
    66                     limit = HOST_LIMITS.get(cachedJob.getUrl().getHost());
    67                     if (limit == null) {
    68                         limit = new Semaphore(HOST_LIMIT.get().intValue());
    69                         HOST_LIMITS.put(cachedJob.getUrl().getHost(), limit);
    70                     }
     54    private Semaphore getSemaphore() {
     55        String host = getUrl().getHost();
     56        Semaphore limit = HOST_LIMITS.get(host);
     57        if (limit == null) {
     58            synchronized(HOST_LIMITS) {
     59                limit = HOST_LIMITS.get(host);
     60                if (limit == null) {
     61                    limit = new Semaphore(HOST_LIMIT.get().intValue());
     62                    HOST_LIMITS.put(host, limit);
    7163                }
    7264            }
    73             return limit;
    7465        }
     66        return limit;
     67    }
    7568
    76         private boolean acquireSemaphore(Runnable r) {
    77             boolean ret = true;
    78             Semaphore limit = getSemaphore(r);
    79             if (limit != null) {
    80                 ret = limit.tryAcquire();
    81                 if (!ret) {
    82                     Main.debug("rejecting job because of per host limit");
    83                 }
     69    private boolean acquireSemaphore() {
     70        boolean ret = true;
     71        Semaphore limit = getSemaphore();
     72        if (limit != null) {
     73            ret = limit.tryAcquire();
     74            if (!ret) {
     75                Main.debug("rejecting job because of per host limit");
    8476            }
    85             return ret;
    86         }
    87 
    88         @Override
    89         public boolean offer(Runnable t) {
    90             return acquireSemaphore(t) && super.offerFirst(t);
    91         }
    92 
    93         private Runnable releaseSemaphore(Runnable r) {
    94             Semaphore limit = getSemaphore(r);
    95             if (limit != null)
    96                 limit.release();
    97             return r;
    9877        }
     78        return ret;
    9979
    100         @Override
    101         public Runnable remove() {
    102             return releaseSemaphore(super.removeFirst());
    103         }
    104 
    105         @Override
    106         public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
    107             return releaseSemaphore(super.poll(timeout, unit));
    108         }
     80    }
    10981
    110         @Override
    111         public Runnable take() throws InterruptedException {
    112             return releaseSemaphore(super.take());
     82    private void releaseSemaphore() {
     83        Semaphore limit = getSemaphore();
     84        if (limit != null) {
     85            limit.release();
    11386        }
    11487    }
    11588
     89
    11690    private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>();
    11791
    11892    /**
     93     * overrides the THREAD_LIMIT in superclass, as we want to have separate limit and pool for TMS
     94     */
     95    public final static IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
     96
     97    /**
    11998     * separate from JCS thread pool for TMS loader, so we can have different thread pools for default JCS
    12099     * and for TMS imagery
    121100     */
    public class TMSCachedTileLoaderJob extends JCSCachedTileLoaderJob<String, Buffe  
    142121        DOWNLOAD_JOB_DISPATCHER = getThreadPoolExecutor();
    143122    }
    144123
    145 
    146124    /**
    147125     * Constructor for creating a job, to get a specific tile from cache
    148126     * @param listener
    public class TMSCachedTileLoaderJob extends JCSCachedTileLoaderJob<String, Buffe  
    174152    /*
    175153     *  this doesn't needs to be synchronized, as it's not that costly to keep only one execution
    176154     *  in parallel, but URL creation and Tile.getUrl() are costly and are not needed when fetching
    177      *  data from cache
     155     *  data from cache, that's why URL creation is postponed until it's needed
    178156     *
    179157     *  We need to have static url value for TileLoaderJob, as for some TileSources we might get different
    180158     *  URL's each call we made (servers switching), and URL's are used below as a key for duplicate detection
    public class TMSCachedTileLoaderJob extends JCSCachedTileLoaderJob<String, Buffe  
    233211        return DOWNLOAD_JOB_DISPATCHER;
    234212    }
    235213
     214    @Override
     215    protected boolean executionGuard() {
     216        return acquireSemaphore();
     217    }
     218
     219    @Override
     220    protected void executionFinished() {
     221        releaseSemaphore();
     222    }
     223
    236224    public void submit() {
    237225        tile.initLoading();
    238226        super.submit(this);
    public class TMSCachedTileLoaderJob extends JCSCachedTileLoaderJob<String, Buffe  
    245233            switch(result){
    246234            case FAILURE:
    247235                tile.setError("Problem loading tile");
     236                // no break intentional here
    248237            case SUCCESS:
    249238                handleNoTileAtZoom();
    250239                if (object != null) {
    public class TMSCachedTileLoaderJob extends JCSCachedTileLoaderJob<String, Buffe  
    253242                        tile.loadImage(new ByteArrayInputStream(content));
    254243                    }
    255244                }
     245                // no break intentional here
    256246            case REJECTED:
    257                 // do not set anything here, leave waiting sign
     247                // do nothing
    258248            }
    259249            if (listener != null) {
    260250                listener.tileLoadingFinished(tile, result.equals(LoadResult.SUCCESS));