source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 10841

Last change on this file since 10841 was 10723, checked in by simon04, 8 years ago

Fix javadoc

  • Property svn:eol-style set to native
File size: 5.2 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.IOException;
5import java.net.URL;
6import java.util.Iterator;
7import java.util.Map;
8import java.util.concurrent.ConcurrentHashMap;
9import java.util.concurrent.LinkedBlockingDeque;
10import java.util.concurrent.Semaphore;
11import java.util.concurrent.TimeUnit;
12
13import org.openstreetmap.josm.Main;
14
15/**
16 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
17 * and it will set a runnable task with semaphore release, when job has finished.
18 * <p>
19 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
20 * guarantee that all threads will be busy, when there is work for them[2]. <br>
21 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
22 * tasks do not go through the Queue <br>
23 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
24 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
25 * for some task further in queue, but this implementation doesn't try to detect such situation
26 *
27 * @author Wiktor Niesiobędzki
28 */
29public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
30 private static final long serialVersionUID = 1L;
31
32 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
33 private final int hostLimit;
34
35 /**
36 * Creates an unbounded queue
37 * @param hostLimit how many parallel calls to host to allow
38 */
39 public HostLimitQueue(int hostLimit) {
40 super(); // create unbounded queue
41 this.hostLimit = hostLimit;
42 }
43
44 private JCSCachedTileLoaderJob<?, ?> findJob() {
45 for (Iterator<Runnable> it = iterator(); it.hasNext();) {
46 Runnable r = it.next();
47 if (r instanceof JCSCachedTileLoaderJob) {
48 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
49 if (tryAcquireSemaphore(job)) {
50 if (remove(job)) {
51 return job;
52 } else {
53 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
54 // release the semaphore and look for another candidate
55 releaseSemaphore(job);
56 }
57 } else {
58 URL url = null;
59 try {
60 url = job.getUrl();
61 } catch (IOException e) {
62 Main.debug(e);
63 }
64 Main.debug("TMS - Skipping job {0} because host limit reached", url);
65 }
66 }
67 }
68 return null;
69 }
70
71 @Override
72 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
73 Runnable job = findJob();
74 if (job != null) {
75 return job;
76 }
77 job = pollFirst(timeout, unit);
78 if (job != null) {
79 acquireSemaphore(job);
80 }
81 return job;
82 }
83
84 @Override
85 public Runnable take() throws InterruptedException {
86 Runnable job = findJob();
87 if (job != null) {
88 return job;
89 }
90 job = takeFirst();
91 if (job != null) {
92 acquireSemaphore(job);
93 }
94 return job;
95 }
96
97 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
98 String host;
99 try {
100 host = job.getUrl().getHost();
101 } catch (IOException e) {
102 // do not pass me illegal URL's
103 throw new IllegalArgumentException(e);
104 }
105 Semaphore limit = hostSemaphores.get(host);
106 if (limit == null) {
107 synchronized (hostSemaphores) {
108 limit = hostSemaphores.get(host);
109 if (limit == null) {
110 limit = new Semaphore(hostLimit);
111 hostSemaphores.put(host, limit);
112 }
113 }
114 }
115 return limit;
116 }
117
118 private void acquireSemaphore(Runnable job) throws InterruptedException {
119 if (job instanceof JCSCachedTileLoaderJob) {
120 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
121 Semaphore limit = getSemaphore(jcsJob);
122 if (limit != null) {
123 limit.acquire();
124 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
125 }
126 }
127 }
128
129 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
130 boolean ret = true;
131 Semaphore limit = getSemaphore(job);
132 if (limit != null) {
133 ret = limit.tryAcquire();
134 if (ret) {
135 job.setFinishedTask(() -> releaseSemaphore(job));
136 }
137 }
138 return ret;
139 }
140
141 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
142 Semaphore limit = getSemaphore(job);
143 if (limit != null) {
144 limit.release();
145 if (limit.availablePermits() > hostLimit) {
146 Main.warn("More permits than it should be");
147 }
148 }
149 }
150}
Note: See TracBrowser for help on using the repository browser.