source: josm/trunk/src/org/openstreetmap/josm/data/cache/HostLimitQueue.java@ 8783

Last change on this file since 8783 was 8783, checked in by simon04, 9 years ago

HostLimitQueue: log "host limit reached" as DEBUG message

  • Property svn:eol-style set to native
File size: 5.6 KB
Line 
1// License: GPL. For details, see LICENSE file.
2package org.openstreetmap.josm.data.cache;
3
4import java.io.IOException;
5import java.net.URL;
6import java.util.Iterator;
7import java.util.Map;
8import java.util.concurrent.ConcurrentHashMap;
9import java.util.concurrent.LinkedBlockingDeque;
10import java.util.concurrent.Semaphore;
11import java.util.concurrent.TimeUnit;
12
13import org.openstreetmap.josm.Main;
14
15/**
16 * @author Wiktor Niesiobędzki
17 *
18 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
19 * and it will set a runnable task with semaphore release, when job has finished.
20 *
21 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
22 * guarantee that all threads will be busy, when there is work for them[2].
23 *
24 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
25 * tasks do not go through the Queue
26 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
27 * take the first available job and wait for semaphore. It might be the case, that semaphore was released
28 * for some task further in queue, but this implementation doesn't try to detect such situation
29 *
30 *
31 */
32public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
33 private static final long serialVersionUID = 1L;
34
35 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
36 private final int hostLimit;
37
38 /**
39 * Creates an unbounded queue
40 * @param hostLimit how many parallel calls to host to allow
41 */
42 public HostLimitQueue(int hostLimit) {
43 super(); // create unbounded queue
44 this.hostLimit = hostLimit;
45 }
46
47 private JCSCachedTileLoaderJob<?, ?> findJob() {
48 for (Iterator<Runnable> it = iterator(); it.hasNext();) {
49 Runnable r = it.next();
50 if (r instanceof JCSCachedTileLoaderJob) {
51 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
52 if (tryAcquireSemaphore(job)) {
53 if (remove(job)) {
54 return job;
55 } else {
56 // we have acquired the semaphore, but we didn't manage to remove it, as someone else did
57 // release the semaphore and look for another candidate
58 releaseSemaphore(job);
59 }
60 } else {
61 URL url = null;
62 try {
63 url = job.getUrl();
64 } catch (IOException e) {
65 if (Main.isDebugEnabled()) {
66 Main.debug(e.getMessage());
67 }
68 }
69 Main.debug("TMS - Queuing job {0} because host limit reached", url);
70 }
71 }
72 }
73 return null;
74 }
75
76 @Override
77 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
78 Runnable job = findJob();
79 if (job != null) {
80 return job;
81 }
82 job = pollFirst(timeout, unit);
83 if (job != null) {
84 acquireSemaphore(job);
85 }
86 return job;
87 }
88
89 @Override
90 public Runnable take() throws InterruptedException {
91 Runnable job = findJob();
92 if (job != null) {
93 return job;
94 }
95 job = takeFirst();
96 if (job != null) {
97 acquireSemaphore(job);
98 }
99 return job;
100 }
101
102 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
103 String host;
104 try {
105 host = job.getUrl().getHost();
106 } catch (IOException e) {
107 // do not pass me illegal URL's
108 throw new IllegalArgumentException(e);
109 }
110 Semaphore limit = hostSemaphores.get(host);
111 if (limit == null) {
112 synchronized (hostSemaphores) {
113 limit = hostSemaphores.get(host);
114 if (limit == null) {
115 limit = new Semaphore(hostLimit);
116 hostSemaphores.put(host, limit);
117 }
118 }
119 }
120 return limit;
121 }
122
123 private void acquireSemaphore(Runnable job) throws InterruptedException {
124 if (job instanceof JCSCachedTileLoaderJob) {
125 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
126 Semaphore limit = getSemaphore(jcsJob);
127 if (limit != null) {
128 limit.acquire();
129 jcsJob.setFinishedTask(new Runnable() {
130 @Override
131 public void run() {
132 releaseSemaphore(jcsJob);
133 }
134 });
135 }
136 }
137 }
138
139 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
140 boolean ret = true;
141 Semaphore limit = getSemaphore(job);
142 if (limit != null) {
143 ret = limit.tryAcquire();
144 if (ret) {
145 job.setFinishedTask(new Runnable() {
146 @Override
147 public void run() {
148 releaseSemaphore(job);
149 }
150 });
151 }
152 }
153 return ret;
154 }
155
156 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
157 Semaphore limit = getSemaphore(job);
158 if (limit != null) {
159 limit.release();
160 if (limit.availablePermits() > hostLimit) {
161 Main.warn("More permits than it should be");
162 }
163 }
164 }
165}
Note: See TracBrowser for help on using the repository browser.