Changeset 8403 in josm for trunk


Ignore:
Timestamp:
2015-05-20T21:47:44+02:00 (10 years ago)
Author:
wiktorn
Message:

Rework the per host limit, so the queue will never reject the submited job. Also - sort tiles during loading in TMS, so center tiles will be loaded first. closes: #11437

Location:
trunk/src/org/openstreetmap/josm
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/org/openstreetmap/josm/data/cache/ICachedLoaderListener.java

    r8344 r8403  
    1111        SUCCESS,
    1212        FAILURE,
    13         REJECTED
     13        CANCELED
    1414    }
    1515    /**
  • trunk/src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java

    r8401 r8403  
    1616import java.util.concurrent.ConcurrentHashMap;
    1717import java.util.concurrent.ConcurrentMap;
    18 import java.util.concurrent.Executor;
     18import java.util.concurrent.Executors;
    1919import java.util.concurrent.LinkedBlockingDeque;
    20 import java.util.concurrent.RejectedExecutionException;
     20import java.util.concurrent.ThreadFactory;
    2121import java.util.concurrent.ThreadPoolExecutor;
    2222import java.util.concurrent.TimeUnit;
     
    3535 *
    3636 * @param <K> cache entry key type
     37 * @param <V> cache value type
    3738 *
    3839 * Generic loader for HTTP based tiles. Uses custom attribute, to check, if entry has expired
     
    6263    public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10);
    6364
    64     public static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
    65 
    66         /**
    67          * Constructs a new {@code LIFOQueue} with a capacity of {@link Integer#MAX_VALUE}.
    68          */
    69         public LIFOQueue() {
    70             super();
    71         }
    72 
    73         /**
    74          * Constructs a new {@code LIFOQueue} with the given (fixed) capacity.
    75          * @param capacity the capacity of this deque
    76          * @throws IllegalArgumentException if {@code capacity} is less than 1
    77          */
    78         public LIFOQueue(int capacity) {
    79             super(capacity);
    80         }
    81 
    82         @Override
    83         public boolean offer(Runnable t) {
    84             return super.offerFirst(t);
    85         }
    86 
    87         @Override
    88         public Runnable remove() {
    89             return super.removeFirst();
    90         }
    91     }
    92 
    93 
    94     /**
    95      * ThreadPoolExecutor starts new threads, until THREAD_LIMIT is reached. Then it puts tasks into LIFOQueue, which is fairly
    96      * small, but we do not want a lot of outstanding tasks queued, but rather prefer the class consumer to resubmit the task, which are
    97      * important right now.
     65    /*
     66     * ThreadPoolExecutor starts new threads, until THREAD_LIMIT is reached. Then it puts tasks into LinkedBlockingDeque.
    9867     *
    99      * This way, if some task gets outdated (for example - user paned the map, and we do not want to download this tile any more),
    100      * the task will not be resubmitted, and thus - never queued.
     68     * The queue works FIFO, so one needs to take care about ordering of the entries submitted
    10169     *
    10270     * There is no point in canceling tasks, that are already taken by worker threads (if we made so much effort, we can at least cache
     
    10472     * and performance (we do want to have something to offer to worker threads before tasks will be resubmitted by class consumer)
    10573     */
    106     private static Executor DEFAULT_DOWNLOAD_JOB_DISPATCHER = new ThreadPoolExecutor(
     74
     75    private static ThreadPoolExecutor DEFAULT_DOWNLOAD_JOB_DISPATCHER = new ThreadPoolExecutor(
    10776            2, // we have a small queue, so threads will be quickly started (threads are started only, when queue is full)
    10877            THREAD_LIMIT.get().intValue(), // do not this number of threads
     
    11079            TimeUnit.SECONDS,
    11180            // make queue of LIFO type - so recently requested tiles will be loaded first (assuming that these are which user is waiting to see)
    112             new LIFOQueue(5)
     81            new LinkedBlockingDeque<Runnable>(),
     82            getNamedThreadFactory("JCS downloader")
    11383            );
     84
     85    public static ThreadFactory getNamedThreadFactory(final String name) {
     86        return new ThreadFactory(){
     87            public Thread newThread(Runnable r) {
     88                Thread t = Executors.defaultThreadFactory().newThread(r);
     89                t.setName(name);
     90                return t;
     91                }
     92        };
     93    }
    11494
    11595    private static ConcurrentMap<String,Set<ICachedLoaderListener>> inProgress = new ConcurrentHashMap<>();
     
    127107    private int readTimeout;
    128108    private Map<String, String> headers;
    129     private Executor downloadJobExecutor;
     109    private ThreadPoolExecutor downloadJobExecutor;
     110    private Runnable finishTask;
    130111
    131112    /**
     
    139120            int connectTimeout, int readTimeout,
    140121            Map<String, String> headers,
    141             Executor downloadJobExecutor) {
     122            ThreadPoolExecutor downloadJobExecutor) {
    142123
    143124        this.cache = cache;
     
    209190            }
    210191            // object not in cache, so submit work to separate thread
    211             try {
    212                 if (executionGuard()) {
    213                     // use getter method, so subclasses may override executors, to get separate thread pool
    214                     getDownloadExecutor().execute(this);
    215                 } else {
    216                     log.log(Level.FINE, "JCS - guard rejected job for: {0}", getCacheKey());
    217                     finishLoading(LoadResult.REJECTED);
    218                 }
    219             } catch (RejectedExecutionException e) {
    220                 // queue was full, try again later
    221                 log.log(Level.FINE, "JCS - rejected job for: {0}", getCacheKey());
    222                 finishLoading(LoadResult.REJECTED);
    223             }
    224         }
    225     }
    226 
    227     /**
    228      * Guard method for execution. If guard returns true, the execution of download task will commence
    229      * otherwise, execution will finish with result LoadResult.REJECTED
    230      *
    231      * It is responsibility of the overriding class, to handle properly situation in finishLoading class
    232      * @return
    233      */
    234     protected boolean executionGuard() {
    235         return true;
     192            getDownloadExecutor().execute(this);
     193        }
    236194    }
    237195
     
    240198     */
    241199    protected void executionFinished() {
     200        if (finishTask != null) {
     201            finishTask.run();
     202        }
    242203    }
    243204
     
    269230     * this needs to be non-static, so it can be overridden by subclasses
    270231     */
    271     protected Executor getDownloadExecutor() {
     232    protected ThreadPoolExecutor getDownloadExecutor() {
    272233        return downloadJobExecutor;
    273234    }
     
    498459        }
    499460    }
     461
     462    /**
     463     * TODO: move to JobFactory
     464     * cancels all outstanding tasks in the queue.
     465     */
     466    public void cancelOutstandingTasks() {
     467        ThreadPoolExecutor downloadExecutor = getDownloadExecutor();
     468        for(Runnable r: downloadExecutor.getQueue()) {
     469            if (downloadExecutor.remove(r)) {
     470                if (r instanceof JCSCachedTileLoaderJob) {
     471                    ((JCSCachedTileLoaderJob<?, ?>) r).handleJobCancellation();
     472                }
     473            }
     474        }
     475    }
     476
     477    /**
     478     * Sets a job, that will be run, when job will finish execution
     479     * @param runnable that will be executed
     480     */
     481    public void setFinishedTask(Runnable runnable) {
     482        this.finishTask = runnable;
     483
     484    }
     485
     486    /**
     487     * Marks this job as canceled
     488     */
     489    public void handleJobCancellation() {
     490        finishLoading(LoadResult.CANCELED);
     491    }
    500492}
  • trunk/src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoader.java

    r8397 r8403  
    1616import org.openstreetmap.gui.jmapviewer.interfaces.TileSource;
    1717import org.openstreetmap.josm.data.cache.BufferedImageCacheEntry;
     18import org.openstreetmap.josm.data.cache.HostLimitQueue;
    1819import org.openstreetmap.josm.data.cache.JCSCacheManager;
    19 import org.openstreetmap.josm.data.cache.JCSCachedTileLoaderJob.LIFOQueue;
     20import org.openstreetmap.josm.data.cache.JCSCachedTileLoaderJob;
    2021import org.openstreetmap.josm.data.preferences.IntegerProperty;
    2122
     
    3435    private TileLoaderListener listener;
    3536    private static final String PREFERENCE_PREFIX   = "imagery.tms.cache.";
    36     // average tile size is about 20kb
     37    /**
     38     * how many object on disk should be stored for TMS region. Average tile size is about 20kb
     39     */
    3740    public static final IntegerProperty MAX_OBJECTS_ON_DISK = new IntegerProperty(PREFERENCE_PREFIX + "max_objects_disk", 25000); // 25000 is around 500MB under this assumptions
    3841
     
    4144     */
    4245    public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobs", 25);
     46
     47    /**
     48     * Limit definition for per host concurrent connections
     49     */
     50    public static final IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
     51
    4352
    4453    /**
     
    5463                30, // keepalive for thread
    5564                TimeUnit.SECONDS,
    56                 // make queue of LIFO type - so recently requested tiles will be loaded first (assuming that these are which user is waiting to see)
    57                 new LIFOQueue()
    58                     /* keep the queue size fairly small, we do not want to
    59                      download a lot of tiles, that user is not seeing anyway */
     65                new HostLimitQueue(HOST_LIMIT.get().intValue()),
     66                JCSCachedTileLoaderJob.getNamedThreadFactory("TMS downloader")
    6067                );
    6168    }
     
    123130
    124131    /**
    125      * Sets the download executor for this tile loader factory. Enables to use different queuing method
    126      * for this factory.
    127      * @param downloadExecutor
    128      */
    129     public void setDownloadExecutor(ThreadPoolExecutor downloadExecutor) {
    130         this.downloadExecutor = downloadExecutor;
    131     }
    132 
    133     /**
    134      * @return Executor that handles the jobs for this tile loader
    135      */
    136     public ThreadPoolExecutor getDownloadExecutor() {
    137         return downloadExecutor;
    138     }
    139 
    140     /**
    141132     * cancels all outstanding tasks in the queue. This rollbacks the state of the tiles in the queue
    142133     * to loading = false / loaded = false
    143134     */
    144135    public void cancelOutstandingTasks() {
    145         for(Runnable elem: downloadExecutor.getQueue()) {
    146             if (elem instanceof TMSCachedTileLoaderJob) {
    147                 TMSCachedTileLoaderJob loaderJob = (TMSCachedTileLoaderJob) elem;
    148                 if (downloadExecutor.remove(loaderJob)) {
    149                     Tile t = loaderJob.getTile();
    150                     t.finishLoading();
    151                     t.setLoaded(false);
    152                 }
     136        for(Runnable r: downloadExecutor.getQueue()) {
     137            if (downloadExecutor.remove(r) && r instanceof TMSCachedTileLoaderJob) {
     138                ((TMSCachedTileLoaderJob)r).handleJobCancellation();
    153139            }
    154140        }
    155141    }
    156 
    157142}
  • trunk/src/org/openstreetmap/josm/data/imagery/TMSCachedTileLoaderJob.java

    r8397 r8403  
    1313import java.util.concurrent.ConcurrentHashMap;
    1414import java.util.concurrent.ConcurrentMap;
    15 import java.util.concurrent.Executor;
    16 import java.util.concurrent.Semaphore;
     15import java.util.concurrent.ThreadPoolExecutor;
    1716import java.util.logging.Level;
    1817import java.util.logging.Logger;
     
    2524import org.openstreetmap.gui.jmapviewer.interfaces.TileSource;
    2625import org.openstreetmap.gui.jmapviewer.tilesources.AbstractTMSTileSource;
    27 import org.openstreetmap.josm.Main;
    2826import org.openstreetmap.josm.data.cache.BufferedImageCacheEntry;
    2927import org.openstreetmap.josm.data.cache.CacheEntry;
     
    3129import org.openstreetmap.josm.data.cache.ICachedLoaderListener;
    3230import org.openstreetmap.josm.data.cache.JCSCachedTileLoaderJob;
    33 import org.openstreetmap.josm.data.preferences.IntegerProperty;
    3431
    3532/**
     
    4946
    5047    /**
    51      * Limit definition for per host concurrent connections
    52      */
    53     public static final IntegerProperty HOST_LIMIT = new IntegerProperty("imagery.tms.tmsloader.maxjobsperhost", 6);
    54 
    55      /*
    56      * Host limit guards the area - between submission to the queue up to loading is finished. It uses executionGuard method
    57      * from JCSCachedTileLoaderJob to acquire the semaphore, and releases it - when loadingFinished is called (but not when
    58      * LoadResult.GUARD_REJECTED is set)
    59      *
    60      */
    61 
    62     private Semaphore getSemaphore() {
    63         String host = getUrl().getHost();
    64         Semaphore limit = HOST_LIMITS.get(host);
    65         if (limit == null) {
    66             synchronized(HOST_LIMITS) {
    67                 limit = HOST_LIMITS.get(host);
    68                 if (limit == null) {
    69                     limit = new Semaphore(HOST_LIMIT.get().intValue());
    70                     HOST_LIMITS.put(host, limit);
    71                 }
    72             }
    73         }
    74         return limit;
    75     }
    76 
    77     private boolean acquireSemaphore() {
    78         boolean ret = true;
    79         Semaphore limit = getSemaphore();
    80         if (limit != null) {
    81             ret = limit.tryAcquire();
    82             if (!ret) {
    83                 Main.debug("rejecting job because of per host limit");
    84             }
    85         }
    86         return ret;
    87     }
    88 
    89     private void releaseSemaphore() {
    90         Semaphore limit = getSemaphore();
    91         if (limit != null) {
    92             limit.release();
    93         }
    94     }
    95 
    96     private static Map<String, Semaphore> HOST_LIMITS = new ConcurrentHashMap<>();
    97 
    98     /**
    99      * Reconfigures download dispatcher using current values of THREAD_LIMIT and HOST_LIMIT
    100      */
    101     public static final void reconfigureDownloadDispatcher() {
    102         HOST_LIMITS = new ConcurrentHashMap<>();
    103     }
    104 
    105     /**
    10648     * Constructor for creating a job, to get a specific tile from cache
    10749     * @param listener
     
    11153     * @param readTimeout when connecting to remote resource
    11254     * @param headers to be sent together with request
     55     * @param downloadExecutor that will be executing the jobs
    11356     */
    11457    public TMSCachedTileLoaderJob(TileLoaderListener listener, Tile tile,
    11558            ICacheAccess<String, BufferedImageCacheEntry> cache,
    11659            int connectTimeout, int readTimeout, Map<String, String> headers,
    117             Executor downloadExecutor) {
     60            ThreadPoolExecutor downloadExecutor) {
    11861        super(cache, connectTimeout, readTimeout, headers, downloadExecutor);
    11962        this.tile = tile;
     
    207150    }
    208151
    209     @Override
    210     protected boolean executionGuard() {
    211         return acquireSemaphore();
    212     }
    213 
    214     @Override
    215     protected void executionFinished() {
    216         releaseSemaphore();
    217     }
    218 
    219152    public void submit() {
    220153        tile.initLoading();
     
    234167                tile.finishLoading(); // whatever happened set that loading has finished
    235168                switch(result){
    236                 case FAILURE:
    237                     tile.setError("Problem loading tile");
    238                     // no break intentional here
    239169                case SUCCESS:
    240170                    handleNoTileAtZoom();
     
    249179                        tile.setError(tr("HTTP error {0} when loading tiles", httpStatusCode));
    250180                    }
     181                    break;
     182                case FAILURE:
     183                    tile.setError("Problem loading tile");
    251184                    // no break intentional here
    252                 case REJECTED:
     185                case CANCELED:
    253186                    // do nothing
    254187                }
  • trunk/src/org/openstreetmap/josm/gui/layer/TMSLayer.java

    r8401 r8403  
    2222import java.util.ArrayList;
    2323import java.util.Collections;
     24import java.util.Comparator;
    2425import java.util.HashMap;
    2526import java.util.LinkedList;
     
    11941195        }
    11951196
     1197        private Comparator<Tile> getTileDistanceComparator() {
     1198            final int centerX = (int) Math.ceil((x0 + x1) / 2);
     1199            final int centerY = (int) Math.ceil((y0 + y1) / 2);
     1200            return new Comparator<Tile>() {
     1201                private int getDistance(Tile t) {
     1202                    return Math.abs(t.getXtile() - centerX) + Math.abs(t.getYtile() - centerY);
     1203                }
     1204                @Override
     1205                public int compare(Tile o1, Tile o2) {
     1206                    int distance1 = getDistance(o1);
     1207                    int distance2 = getDistance(o2);
     1208                    return Integer.compare(distance1, distance2);
     1209                }
     1210            };
     1211        }
     1212
    11961213        private void loadAllTiles(boolean force) {
    11971214            if (!autoLoad && !force)
    11981215                return;
    1199             for (Tile t : this.allTilesCreate()) {
     1216            List<Tile> allTiles = allTilesCreate();
     1217            Collections.sort(allTiles, getTileDistanceComparator());
     1218            for (Tile t : allTiles) { //, getTileDistanceComparator())) {
    12001219                loadTile(t, false);
    12011220            }
  • trunk/src/org/openstreetmap/josm/gui/preferences/imagery/TMSSettingsPanel.java

    r8307 r8403  
    4545        maxElementsOnDisk = new JSpinner(new SpinnerNumberModel(TMSCachedTileLoader.MAX_OBJECTS_ON_DISK.get().intValue(), 0, Integer.MAX_VALUE, 1));
    4646        maxConcurrentDownloads = new JSpinner(new SpinnerNumberModel(TMSCachedTileLoaderJob.THREAD_LIMIT.get().intValue(), 0, Integer.MAX_VALUE, 1));
    47         maxDownloadsPerHost = new JSpinner(new SpinnerNumberModel(TMSCachedTileLoaderJob.HOST_LIMIT.get().intValue(), 0, Integer.MAX_VALUE, 1));
     47        maxDownloadsPerHost = new JSpinner(new SpinnerNumberModel(TMSCachedTileLoader.HOST_LIMIT.get().intValue(), 0, Integer.MAX_VALUE, 1));
    4848
    4949        add(new JLabel(tr("Auto zoom by default: ")), GBC.std());
     
    9898        this.maxElementsOnDisk.setValue(TMSCachedTileLoader.MAX_OBJECTS_ON_DISK.get());
    9999        this.maxConcurrentDownloads.setValue(TMSCachedTileLoaderJob.THREAD_LIMIT.get());
    100         this.maxDownloadsPerHost.setValue(TMSCachedTileLoaderJob.HOST_LIMIT.get());
     100        this.maxDownloadsPerHost.setValue(TMSCachedTileLoader.HOST_LIMIT.get());
    101101    }
    102102
     
    117117        TMSLayer.setMinZoomLvl((Integer)this.minZoomLvl.getValue());
    118118
    119         TMSCachedTileLoader.MAX_OBJECTS_ON_DISK.put((Integer) this.maxElementsOnDisk.getValue());
     119        if (!TMSCachedTileLoader.MAX_OBJECTS_ON_DISK.get().equals(this.maxElementsOnDisk.getValue())) {
     120            TMSCachedTileLoader.MAX_OBJECTS_ON_DISK.put((Integer) this.maxElementsOnDisk.getValue());
     121            restartRequired = true;
     122        }
    120123
    121         TMSCachedTileLoaderJob.THREAD_LIMIT.put((Integer) this.maxConcurrentDownloads.getValue());
    122         TMSCachedTileLoaderJob.HOST_LIMIT.put((Integer) this.maxDownloadsPerHost.getValue());
    123         TMSCachedTileLoaderJob.reconfigureDownloadDispatcher();
     124        if(!TMSCachedTileLoader.THREAD_LIMIT.get().equals(this.maxConcurrentDownloads.getValue())) {
     125            TMSCachedTileLoader.THREAD_LIMIT.put((Integer) this.maxConcurrentDownloads.getValue());
     126            restartRequired = true;
     127        }
     128
     129        if(!TMSCachedTileLoader.HOST_LIMIT.get().equals(this.maxDownloadsPerHost.getValue())) {
     130            TMSCachedTileLoader.HOST_LIMIT.put((Integer) this.maxDownloadsPerHost.getValue());
     131            restartRequired = true;
     132        }
    124133
    125134        if (!TMSLayer.PROP_TILECACHE_DIR.get().equals(this.tilecacheDir.getText())) {
Note: See TracChangeset for help on using the changeset viewer.