source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 17374

Last change on this file since 17374 was 17374, checked in by GerdP, 3 years ago

see #20167: [patch] Improve code readability by replacing indexed loops with foreach
Patch by gaben, slightly modified
I removed the changes for

  • GpxImageCorrelation.java, they introduce a TODO
  • ConnectivityRelations.java (no improvement in readability)
  • Property svn:eol-style set to native
File size: 6.3 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.IOException;
5import java.net.URL;
6import java.util.Map;
7import java.util.concurrent.ConcurrentHashMap;
8import java.util.concurrent.LinkedBlockingDeque;
9import java.util.concurrent.Semaphore;
10import java.util.concurrent.TimeUnit;
11
12import org.openstreetmap.josm.tools.Logging;
13
14/**
15 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
16 * and it will set a runnable task with semaphore release, when job has finished.
17 * <p>
18 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
19 * guarantee that all threads will be busy, when there is work for them[2]. <br>
20 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
21 * tasks do not go through the Queue <br>
22 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
23 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
24 * for some task further in queue, but this implementation doesn't try to detect such situation
25 *
26 * @author Wiktor Niesiobędzki
27 */
28public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
29 private static final long serialVersionUID = 1L;
30
31 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
32 private final int hostLimit;
33
34 /**
35 * Creates an unbounded queue
36 * @param hostLimit how many parallel calls to host to allow
37 */
38 public HostLimitQueue(int hostLimit) {
39 super(); // create unbounded queue
40 this.hostLimit = hostLimit;
41 }
42
43 /**
44 * Creates bounded queue
45 * @param hostLimit how many parallel calls to host to allow
46 * @param queueLimit how deep the queue should be
47 */
48 public HostLimitQueue(int hostLimit, int queueLimit) {
49 super(queueLimit); // create bounded queue
50 this.hostLimit = hostLimit;
51 }
52
53 private JCSCachedTileLoaderJob<?, ?> findJob() {
54 for (Runnable r : this) {
55 if (r instanceof JCSCachedTileLoaderJob) {
56 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
57 if (tryAcquireSemaphore(job)) {
58 if (remove(job)) {
59 return job;
60 } else {
61 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
62 // release the semaphore and look for another candidate
63 releaseSemaphore(job);
64 }
65 } else {
66 URL url = null;
67 try {
68 url = job.getUrl();
69 } catch (IOException e) {
70 Logging.debug(e);
71 }
72 Logging.debug("TMS - Skipping job {0} because host limit reached", url);
73 }
74 }
75 }
76 return null;
77 }
78
79 @Override
80 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
81 Runnable job = findJob();
82 if (job != null) {
83 return job;
84 }
85 job = pollFirst(timeout, unit);
86 if (job != null) {
87 try {
88 boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
89 return gotLock ? job : null;
90 } catch (InterruptedException e) {
91 // acquire my got interrupted, first offer back what was taken
92 if (!offer(job)) {
93 Logging.warn("Unable to offer back " + job);
94 }
95 throw e;
96 }
97 }
98 return job;
99 }
100
101 @Override
102 public Runnable take() throws InterruptedException {
103 Runnable job = findJob();
104 if (job != null) {
105 return job;
106 }
107 job = takeFirst();
108 try {
109 acquireSemaphore(job);
110 } catch (InterruptedException e) {
111 // acquire my got interrupted, first offer back what was taken
112 if (!offer(job)) {
113 Logging.warn("Unable to offer back " + job);
114 }
115 throw e;
116 }
117 return job;
118 }
119
120 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
121 String host;
122 try {
123 host = job.getUrl().getHost();
124 } catch (IOException e) {
125 // do not pass me illegal URL's
126 throw new IllegalArgumentException(e);
127 }
128 Semaphore limit = hostSemaphores.get(host);
129 if (limit == null) {
130 limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit));
131 }
132 return limit;
133 }
134
135 private void acquireSemaphore(Runnable job) throws InterruptedException {
136 if (job instanceof JCSCachedTileLoaderJob) {
137 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
138 getSemaphore(jcsJob).acquire();
139 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
140 }
141 }
142
143 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
144 boolean ret = true;
145 Semaphore limit = getSemaphore(job);
146 if (limit != null) {
147 ret = limit.tryAcquire();
148 if (ret) {
149 job.setFinishedTask(() -> releaseSemaphore(job));
150 }
151 }
152 return ret;
153 }
154
155 private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
156 boolean ret = true;
157 if (job instanceof JCSCachedTileLoaderJob) {
158 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
159 Semaphore limit = getSemaphore(jcsJob);
160 if (limit != null) {
161 ret = limit.tryAcquire(timeout, unit);
162 if (ret) {
163 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
164 }
165 }
166 }
167 return ret;
168 }
169
170 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
171 Semaphore limit = getSemaphore(job);
172 if (limit != null) {
173 limit.release();
174 if (limit.availablePermits() > hostLimit) {
175 Logging.warn("More permits than it should be");
176 }
177 }
178 }
179}
Note: See TracBrowser for help on using the repository browser.