source: josm/trunk/src/org/openstreetmap/josm/data/cache/JCSCachedTileLoaderJob.java@ 8397

Last change on this file since 8397 was 8397, checked in by wiktorn, 9 years ago

addresses #11437 - introduce infinite queue for tile loading and clear the queue when user pans the map or changes the zoom. Fixing hosts limit is last problem, thay may incur additional load

File size: 19.5 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.ByteArrayOutputStream;
5import java.io.FileNotFoundException;
6import java.io.IOException;
7import java.io.InputStream;
8import java.net.HttpURLConnection;
9import java.net.URL;
10import java.net.URLConnection;
11import java.util.HashSet;
12import java.util.List;
13import java.util.Map;
14import java.util.Random;
15import java.util.Set;
16import java.util.concurrent.ConcurrentHashMap;
17import java.util.concurrent.ConcurrentMap;
18import java.util.concurrent.Executor;
19import java.util.concurrent.LinkedBlockingDeque;
20import java.util.concurrent.RejectedExecutionException;
21import java.util.concurrent.ThreadPoolExecutor;
22import java.util.concurrent.TimeUnit;
23import java.util.logging.Level;
24import java.util.logging.Logger;
25
26import org.apache.commons.jcs.access.behavior.ICacheAccess;
27import org.apache.commons.jcs.engine.behavior.ICacheElement;
28import org.openstreetmap.gui.jmapviewer.FeatureAdapter;
29import org.openstreetmap.josm.Main;
30import org.openstreetmap.josm.data.cache.ICachedLoaderListener.LoadResult;
31import org.openstreetmap.josm.data.preferences.IntegerProperty;
32
33/**
34 * @author Wiktor Niesiobędzki
35 *
36 * @param <K> cache entry key type
37 *
38 * Generic loader for HTTP based tiles. Uses custom attribute, to check, if entry has expired
39 * according to HTTP headers sent with tile. If so, it tries to verify using Etags
40 * or If-Modified-Since / Last-Modified.
41 *
42 * If the tile is not valid, it will try to download it from remote service and put it
43 * to cache. If remote server will fail it will try to use stale entry.
44 *
45 * This class will keep only one Job running for specified tile. All others will just finish, but
46 * listeners will be gathered and notified, once download job will be finished
47 *
48 * @since 8168
49 */
50public abstract class JCSCachedTileLoaderJob<K, V extends CacheEntry> implements ICachedLoaderJob<K>, Runnable {
51 private static final Logger log = FeatureAdapter.getLogger(JCSCachedTileLoaderJob.class.getCanonicalName());
52 protected static final long DEFAULT_EXPIRE_TIME = 1000L * 60 * 60 * 24 * 7; // 7 days
53 // Limit for the max-age value send by the server.
54 protected static final long EXPIRE_TIME_SERVER_LIMIT = 1000L * 60 * 60 * 24 * 28; // 4 weeks
55 // Absolute expire time limit. Cached tiles that are older will not be used,
56 // even if the refresh from the server fails.
57 protected static final long ABSOLUTE_EXPIRE_TIME_LIMIT = Long.MAX_VALUE; // unlimited
58
59 /**
60 * maximum download threads that will be started
61 */
62 public static final IntegerProperty THREAD_LIMIT = new IntegerProperty("cache.jcs.max_threads", 10);
63
64 public static class LIFOQueue extends LinkedBlockingDeque<Runnable> {
65 public LIFOQueue() {
66 super();
67 }
68
69 public LIFOQueue(int capacity) {
70 super(capacity);
71 }
72
73 @Override
74 public boolean offer(Runnable t) {
75 return super.offerFirst(t);
76 }
77
78 @Override
79 public Runnable remove() {
80 return super.removeFirst();
81 }
82 }
83
84
85 /*
86 * ThreadPoolExecutor starts new threads, until THREAD_LIMIT is reached. Then it puts tasks into LIFOQueue, which is fairly
87 * small, but we do not want a lot of outstanding tasks queued, but rather prefer the class consumer to resubmit the task, which are
88 * important right now.
89 *
90 * This way, if some task gets outdated (for example - user paned the map, and we do not want to download this tile any more),
91 * the task will not be resubmitted, and thus - never queued.
92 *
93 * There is no point in canceling tasks, that are already taken by worker threads (if we made so much effort, we can at least cache
94 * the response, so later it could be used). We could actually cancel what is in LIFOQueue, but this is a tradeoff between simplicity
95 * and performance (we do want to have something to offer to worker threads before tasks will be resubmitted by class consumer)
96 */
97 private static Executor DEFAULT_DOWNLOAD_JOB_DISPATCHER = new ThreadPoolExecutor(
98 2, // we have a small queue, so threads will be quickly started (threads are started only, when queue is full)
99 THREAD_LIMIT.get().intValue(), // do not this number of threads
100 30, // keepalive for thread
101 TimeUnit.SECONDS,
102 // make queue of LIFO type - so recently requested tiles will be loaded first (assuming that these are which user is waiting to see)
103 new LIFOQueue(5)
104 );
105
106 private static ConcurrentMap<String,Set<ICachedLoaderListener>> inProgress = new ConcurrentHashMap<>();
107 private static ConcurrentMap<String, Boolean> useHead = new ConcurrentHashMap<>();
108
109 private long now; // when the job started
110
111 private ICacheAccess<K, V> cache;
112 private ICacheElement<K, V> cacheElement;
113 protected V cacheData = null;
114 protected CacheEntryAttributes attributes = null;
115
116 // HTTP connection parameters
117 private int connectTimeout;
118 private int readTimeout;
119 private Map<String, String> headers;
120 private Executor downloadJobExecutor;
121
122 /**
123 * @param cache cache instance that we will work on
124 * @param headers
125 * @param readTimeout
126 * @param connectTimeout
127 * @param downloadJobExecutor
128 */
129 public JCSCachedTileLoaderJob(ICacheAccess<K,V> cache,
130 int connectTimeout, int readTimeout,
131 Map<String, String> headers,
132 Executor downloadJobExecutor) {
133
134 this.cache = cache;
135 this.now = System.currentTimeMillis();
136 this.connectTimeout = connectTimeout;
137 this.readTimeout = readTimeout;
138 this.headers = headers;
139 this.downloadJobExecutor = downloadJobExecutor;
140 }
141
142 /**
143 * @param cache cache instance that we will work on
144 * @param headers
145 * @param readTimeout
146 * @param connectTimeout
147 */
148 public JCSCachedTileLoaderJob(ICacheAccess<K, V> cache,
149 int connectTimeout, int readTimeout,
150 Map<String, String> headers) {
151 this(cache, connectTimeout, readTimeout,
152 headers, DEFAULT_DOWNLOAD_JOB_DISPATCHER);
153 }
154
155 private void ensureCacheElement() {
156 if (cacheElement == null && getCacheKey() != null) {
157 cacheElement = cache.getCacheElement(getCacheKey());
158 if (cacheElement != null) {
159 attributes = (CacheEntryAttributes) cacheElement.getElementAttributes();
160 cacheData = cacheElement.getVal();
161 }
162 }
163 }
164
165 public V get() {
166 ensureCacheElement();
167 return cacheData;
168 }
169
170 @Override
171 public void submit(ICachedLoaderListener listener) {
172 boolean first = false;
173 URL url = getUrl();
174 String deduplicationKey = null;
175 if (url != null) {
176 // url might be null, for example when Bing Attribution is not loaded yet
177 deduplicationKey = url.toString();
178 }
179 if (deduplicationKey == null) {
180 log.log(Level.WARNING, "No url returned for: {0}, skipping", getCacheKey());
181 return;
182 }
183 synchronized (inProgress) {
184 Set<ICachedLoaderListener> newListeners = inProgress.get(deduplicationKey);
185 if (newListeners == null) {
186 newListeners = new HashSet<>();
187 inProgress.put(deduplicationKey, newListeners);
188 first = true;
189 }
190 newListeners.add(listener);
191 }
192
193 if (first) {
194 ensureCacheElement();
195 if (cacheElement != null && isCacheElementValid() && (isObjectLoadable())) {
196 // we got something in cache, and it's valid, so lets return it
197 log.log(Level.FINE, "JCS - Returning object from cache: {0}", getCacheKey());
198 finishLoading(LoadResult.SUCCESS);
199 return;
200 }
201 // object not in cache, so submit work to separate thread
202 try {
203 if (executionGuard()) {
204 // use getter method, so subclasses may override executors, to get separate thread pool
205 getDownloadExecutor().execute(this);
206 } else {
207 log.log(Level.FINE, "JCS - guard rejected job for: {0}", getCacheKey());
208 finishLoading(LoadResult.REJECTED);
209 }
210 } catch (RejectedExecutionException e) {
211 // queue was full, try again later
212 log.log(Level.FINE, "JCS - rejected job for: {0}", getCacheKey());
213 finishLoading(LoadResult.REJECTED);
214 }
215 }
216 }
217
218 /**
219 * Guard method for execution. If guard returns true, the execution of download task will commence
220 * otherwise, execution will finish with result LoadResult.REJECTED
221 *
222 * It is responsibility of the overriding class, to handle properly situation in finishLoading class
223 * @return
224 */
225 protected boolean executionGuard() {
226 return true;
227 }
228
229 /**
230 * This method is run when job has finished
231 */
232 protected void executionFinished() {
233 }
234
235 /**
236 *
237 * @return checks if object from cache has sufficient data to be returned
238 */
239 protected boolean isObjectLoadable() {
240 byte[] content = cacheData.getContent();
241 return content != null && content.length > 0;
242 }
243
244 /**
245 *
246 * @return cache object as empty, regardless of what remote resource has returned (ex. based on headers)
247 */
248 protected boolean cacheAsEmpty(Map<String, List<String>> headers, int statusCode, byte[] content) {
249 return false;
250 }
251
252 /**
253 * @return key under which discovered server settings will be kept
254 */
255 protected String getServerKey() {
256 return getUrl().getHost();
257 }
258
259 /**
260 * this needs to be non-static, so it can be overridden by subclasses
261 */
262 protected Executor getDownloadExecutor() {
263 return downloadJobExecutor;
264 }
265
266 public void run() {
267 final Thread currentThread = Thread.currentThread();
268 final String oldName = currentThread.getName();
269 currentThread.setName("JCS Downloading: " + getUrl());
270 try {
271 // try to load object from remote resource
272 if (loadObject()) {
273 finishLoading(LoadResult.SUCCESS);
274 } else {
275 // if loading failed - check if we can return stale entry
276 if (isObjectLoadable()) {
277 // try to get stale entry in cache
278 finishLoading(LoadResult.SUCCESS);
279 log.log(Level.FINE, "JCS - found stale object in cache: {0}", getUrl());
280 } else {
281 // failed completely
282 finishLoading(LoadResult.FAILURE);
283 }
284 }
285 } finally {
286 executionFinished();
287 currentThread.setName(oldName);
288 }
289 }
290
291
292 private void finishLoading(LoadResult result) {
293 Set<ICachedLoaderListener> listeners = null;
294 synchronized (inProgress) {
295 listeners = inProgress.remove(getUrl().toString());
296 }
297 if (listeners == null) {
298 log.log(Level.WARNING, "Listener not found for URL: {0}. Listener not notified!", getUrl());
299 return;
300 }
301 try {
302 for (ICachedLoaderListener l: listeners) {
303 l.loadingFinished(cacheData, attributes, result);
304 }
305 } catch (Exception e) {
306 log.log(Level.WARNING, "JCS - Error while loading object from cache: {0}; {1}", new Object[]{e.getMessage(), getUrl()});
307 Main.warn(e);
308 for (ICachedLoaderListener l: listeners) {
309 l.loadingFinished(cacheData, attributes, LoadResult.FAILURE);
310 }
311
312 }
313
314 }
315
316 private boolean isCacheElementValid() {
317 long expires = attributes.getExpirationTime();
318
319 // check by expire date set by server
320 if (expires != 0L) {
321 // put a limit to the expire time (some servers send a value
322 // that is too large)
323 expires = Math.min(expires, attributes.getCreateTime() + EXPIRE_TIME_SERVER_LIMIT);
324 if (now > expires) {
325 log.log(Level.FINE, "JCS - Object {0} has expired -> valid to {1}, now is: {2}", new Object[]{getUrl(), Long.toString(expires), Long.toString(now)});
326 return false;
327 }
328 } else {
329 // check by file modification date
330 if (now - attributes.getLastModification() > DEFAULT_EXPIRE_TIME) {
331 log.log(Level.FINE, "JCS - Object has expired, maximum file age reached {0}", getUrl());
332 return false;
333 }
334 }
335 return true;
336 }
337
338 /**
339 * @return true if object was successfully downloaded, false, if there was a loading failure
340 */
341
342 private boolean loadObject() {
343 try {
344 // if we have object in cache, and host doesn't support If-Modified-Since nor If-None-Match
345 // then just use HEAD request and check returned values
346 if (isObjectLoadable() &&
347 Boolean.TRUE.equals(useHead.get(getServerKey())) &&
348 isCacheValidUsingHead()) {
349 log.log(Level.FINE, "JCS - cache entry verified using HEAD request: {0}", getUrl());
350 return true;
351 }
352
353 HttpURLConnection urlConn = getURLConnection();
354
355 if (isObjectLoadable() &&
356 (now - attributes.getLastModification()) <= ABSOLUTE_EXPIRE_TIME_LIMIT) {
357 urlConn.setIfModifiedSince(attributes.getLastModification());
358 }
359 if (isObjectLoadable() && attributes.getEtag() != null) {
360 urlConn.addRequestProperty("If-None-Match", attributes.getEtag());
361 }
362 if (urlConn.getResponseCode() == 304) {
363 // If isModifiedSince or If-None-Match has been set
364 // and the server answers with a HTTP 304 = "Not Modified"
365 log.log(Level.FINE, "JCS - IfModifiedSince/Etag test: local version is up to date: {0}", getUrl());
366 return true;
367 } else if (isObjectLoadable()) {
368 // we have an object in cache, but we haven't received 304 resposne code
369 // check if we should use HEAD request to verify
370 if((attributes.getEtag() != null && attributes.getEtag().equals(urlConn.getRequestProperty("ETag"))) ||
371 attributes.getLastModification() == urlConn.getLastModified()) {
372 // we sent ETag or If-Modified-Since, but didn't get 304 response code
373 // for further requests - use HEAD
374 String serverKey = getServerKey();
375 log.log(Level.INFO, "JCS - Host: {0} found not to return 304 codes for If-Modifed-Since or If-None-Match headers", serverKey);
376 useHead.put(serverKey, Boolean.TRUE);
377 }
378 }
379
380 attributes = parseHeaders(urlConn);
381
382 for (int i = 0; i < 5; ++i) {
383 if (urlConn.getResponseCode() == 503) {
384 Thread.sleep(5000+(new Random()).nextInt(5000));
385 continue;
386 }
387
388 attributes.setResponseCode(urlConn.getResponseCode());
389 byte[] raw = read(urlConn);
390
391 if (!cacheAsEmpty(urlConn.getHeaderFields(), urlConn.getResponseCode(), raw) &&
392 raw != null && raw.length > 0) {
393 // we need to check cacheEmpty, so for cases, when data is returned, but we want to store
394 // as empty (eg. empty tile images) to save some space
395 cacheData = createCacheEntry(raw);
396 cache.put(getCacheKey(), cacheData, attributes);
397 log.log(Level.FINE, "JCS - downloaded key: {0}, length: {1}, url: {2}",
398 new Object[] {getCacheKey(), raw.length, getUrl()});
399 return true;
400 } else {
401 cacheData = createCacheEntry(new byte[]{});
402 cache.put(getCacheKey(), cacheData, attributes);
403 log.log(Level.FINE, "JCS - Caching empty object {0}", getUrl());
404 return true;
405 }
406 }
407 } catch (FileNotFoundException e) {
408 log.log(Level.FINE, "JCS - Caching empty object as server returned 404 for: {0}", getUrl());
409 cache.put(getCacheKey(), createCacheEntry(new byte[]{}), attributes);
410 return handleNotFound();
411 } catch (Exception e) {
412 log.log(Level.WARNING, "JCS - Exception during download {0}", getUrl());
413 Main.warn(e);
414 }
415 log.log(Level.WARNING, "JCS - Silent failure during download: {0}", getUrl());
416 return false;
417
418 }
419
420 /**
421 * @return if we should treat this object as properly loaded
422 */
423 protected abstract boolean handleNotFound();
424
425 protected abstract V createCacheEntry(byte[] content);
426
427 private CacheEntryAttributes parseHeaders(URLConnection urlConn) {
428 CacheEntryAttributes ret = new CacheEntryAttributes();
429
430 Long lng = urlConn.getExpiration();
431 if (lng.equals(0L)) {
432 try {
433 String str = urlConn.getHeaderField("Cache-Control");
434 if (str != null) {
435 for (String token: str.split(",")) {
436 if (token.startsWith("max-age=")) {
437 lng = Long.parseLong(token.substring(8)) * 1000 +
438 System.currentTimeMillis();
439 }
440 }
441 }
442 } catch (NumberFormatException e) {} //ignore malformed Cache-Control headers
443 }
444
445 ret.setExpirationTime(lng);
446 ret.setLastModification(now);
447 ret.setEtag(urlConn.getHeaderField("ETag"));
448 return ret;
449 }
450
451 private HttpURLConnection getURLConnection() throws IOException {
452 HttpURLConnection urlConn = (HttpURLConnection) getUrl().openConnection();
453 urlConn.setRequestProperty("Accept", "text/html, image/png, image/jpeg, image/gif, */*");
454 urlConn.setReadTimeout(readTimeout); // 30 seconds read timeout
455 urlConn.setConnectTimeout(connectTimeout);
456 for(Map.Entry<String, String> e: headers.entrySet()) {
457 urlConn.setRequestProperty(e.getKey(), e.getValue());
458 }
459 return urlConn;
460 }
461
462 private boolean isCacheValidUsingHead() throws IOException {
463 HttpURLConnection urlConn = (HttpURLConnection) getUrl().openConnection();
464 urlConn.setRequestMethod("HEAD");
465 long lastModified = urlConn.getLastModified();
466 return (attributes.getEtag() != null && attributes.getEtag().equals(urlConn.getRequestProperty("ETag"))) ||
467 (lastModified != 0 && lastModified <= attributes.getLastModification());
468 }
469
470 private static byte[] read(URLConnection urlConn) throws IOException {
471 InputStream input = urlConn.getInputStream();
472 try {
473 ByteArrayOutputStream bout = new ByteArrayOutputStream(input.available());
474 byte[] buffer = new byte[2048];
475 boolean finished = false;
476 do {
477 int read = input.read(buffer);
478 if (read >= 0) {
479 bout.write(buffer, 0, read);
480 } else {
481 finished = true;
482 }
483 } while (!finished);
484 if (bout.size() == 0)
485 return null;
486 return bout.toByteArray();
487 } finally {
488 input.close();
489 }
490 }
491}
Note: See TracBrowser for help on using the repository browser.