Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Infrastructure for async requests
4 : Copyright (C) Volker Lendecke 2008
5 : Copyright (C) Stefan Metzmacher 2009
6 :
7 : ** NOTE! The following LGPL license applies to the tevent
8 : ** library. This does NOT imply that all of Samba is released
9 : ** under the LGPL
10 :
11 : This library is free software; you can redistribute it and/or
12 : modify it under the terms of the GNU Lesser General Public
13 : License as published by the Free Software Foundation; either
14 : version 3 of the License, or (at your option) any later version.
15 :
16 : This library is distributed in the hope that it will be useful,
17 : but WITHOUT ANY WARRANTY; without even the implied warranty of
18 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 : Lesser General Public License for more details.
20 :
21 : You should have received a copy of the GNU Lesser General Public
22 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 : */
24 :
25 : #include "replace.h"
26 : #include "tevent.h"
27 : #include "tevent_internal.h"
28 : #include "tevent_util.h"
29 :
30 : struct tevent_queue_entry {
31 : struct tevent_queue_entry *prev, *next;
32 : struct tevent_queue *queue;
33 :
34 : bool triggered;
35 :
36 : struct tevent_req *req;
37 : struct tevent_context *ev;
38 :
39 : tevent_queue_trigger_fn_t trigger;
40 : void *private_data;
41 : };
42 :
43 : struct tevent_queue {
44 : const char *name;
45 : const char *location;
46 :
47 : bool running;
48 : struct tevent_immediate *immediate;
49 :
50 : size_t length;
51 : struct tevent_queue_entry *list;
52 : };
53 :
54 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
55 : struct tevent_immediate *im,
56 : void *private_data);
57 :
58 80929853 : static int tevent_queue_entry_destructor(struct tevent_queue_entry *e)
59 : {
60 80929853 : struct tevent_queue *q = e->queue;
61 :
62 80929853 : if (!q) {
63 0 : return 0;
64 : }
65 :
66 80929853 : DLIST_REMOVE(q->list, e);
67 80929853 : q->length--;
68 :
69 80929853 : if (!q->running) {
70 2871 : return 0;
71 : }
72 :
73 80926949 : if (!q->list) {
74 79708177 : return 0;
75 : }
76 :
77 1168904 : if (q->list->triggered) {
78 474 : return 0;
79 : }
80 :
81 1166379 : tevent_schedule_immediate(q->immediate,
82 : q->list->ev,
83 : tevent_queue_immediate_trigger,
84 : q);
85 :
86 1166379 : return 0;
87 : }
88 :
89 3413418 : static int tevent_queue_destructor(struct tevent_queue *q)
90 : {
91 3413418 : q->running = false;
92 :
93 6560970 : while (q->list) {
94 0 : struct tevent_queue_entry *e = q->list;
95 0 : talloc_free(e);
96 : }
97 :
98 3413418 : return 0;
99 : }
100 :
101 3440244 : struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx,
102 : const char *name,
103 : const char *location)
104 : {
105 : struct tevent_queue *queue;
106 :
107 3440244 : queue = talloc_zero(mem_ctx, struct tevent_queue);
108 3440244 : if (!queue) {
109 0 : return NULL;
110 : }
111 :
112 3440244 : queue->name = talloc_strdup(queue, name);
113 3440244 : if (!queue->name) {
114 0 : talloc_free(queue);
115 0 : return NULL;
116 : }
117 3440244 : queue->immediate = tevent_create_immediate(queue);
118 3440244 : if (!queue->immediate) {
119 0 : talloc_free(queue);
120 0 : return NULL;
121 : }
122 :
123 3440244 : queue->location = location;
124 :
125 : /* queue is running by default */
126 3440244 : queue->running = true;
127 :
128 3440244 : talloc_set_destructor(queue, tevent_queue_destructor);
129 3440244 : return queue;
130 : }
131 :
132 2019883 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
133 : struct tevent_immediate *im,
134 : void *private_data)
135 : {
136 1741592 : struct tevent_queue *q =
137 278291 : talloc_get_type_abort(private_data,
138 : struct tevent_queue);
139 :
140 2019883 : if (!q->running) {
141 0 : return;
142 : }
143 :
144 2019883 : if (!q->list) {
145 0 : return;
146 : }
147 :
148 2019883 : q->list->triggered = true;
149 2019883 : q->list->trigger(q->list->req, q->list->private_data);
150 : }
151 :
152 80931663 : static struct tevent_queue_entry *tevent_queue_add_internal(
153 : struct tevent_queue *queue,
154 : struct tevent_context *ev,
155 : struct tevent_req *req,
156 : tevent_queue_trigger_fn_t trigger,
157 : void *private_data,
158 : bool allow_direct)
159 : {
160 : struct tevent_queue_entry *e;
161 :
162 80931663 : e = talloc_zero(req, struct tevent_queue_entry);
163 80931663 : if (e == NULL) {
164 0 : return NULL;
165 : }
166 :
167 80931663 : e->queue = queue;
168 80931663 : e->req = req;
169 80931663 : e->ev = ev;
170 80931663 : e->trigger = trigger;
171 80931663 : e->private_data = private_data;
172 :
173 : /*
174 : * if there is no trigger, it is just a blocker
175 : */
176 80931663 : if (trigger == NULL) {
177 0 : e->triggered = true;
178 : }
179 :
180 80931663 : if (queue->length > 0) {
181 : /*
182 : * if there are already entries in the
183 : * queue do not optimize.
184 : */
185 1170481 : allow_direct = false;
186 : }
187 :
188 80931663 : if (req->async.fn != NULL) {
189 : /*
190 : * If the caller wants to optimize for the
191 : * empty queue case, call the trigger only
192 : * if there is no callback defined for the
193 : * request yet.
194 : */
195 0 : allow_direct = false;
196 : }
197 :
198 80931663 : DLIST_ADD_END(queue->list, e);
199 80931663 : queue->length++;
200 80931663 : talloc_set_destructor(e, tevent_queue_entry_destructor);
201 :
202 80931663 : if (!queue->running) {
203 24 : return e;
204 : }
205 :
206 80931633 : if (queue->list->triggered) {
207 912062 : return e;
208 : }
209 :
210 : /*
211 : * If allowed we directly call the trigger
212 : * avoiding possible delays caused by
213 : * an immediate event.
214 : */
215 79993329 : if (allow_direct) {
216 78907625 : queue->list->triggered = true;
217 156379671 : queue->list->trigger(queue->list->req,
218 78865217 : queue->list->private_data);
219 78907625 : return e;
220 : }
221 :
222 1085704 : tevent_schedule_immediate(queue->immediate,
223 : queue->list->ev,
224 : tevent_queue_immediate_trigger,
225 : queue);
226 :
227 1085704 : return e;
228 : }
229 :
230 834458 : bool tevent_queue_add(struct tevent_queue *queue,
231 : struct tevent_context *ev,
232 : struct tevent_req *req,
233 : tevent_queue_trigger_fn_t trigger,
234 : void *private_data)
235 : {
236 : struct tevent_queue_entry *e;
237 :
238 834458 : e = tevent_queue_add_internal(queue, ev, req,
239 : trigger, private_data, false);
240 834458 : if (e == NULL) {
241 0 : return false;
242 : }
243 :
244 834458 : return true;
245 : }
246 :
247 59260 : struct tevent_queue_entry *tevent_queue_add_entry(
248 : struct tevent_queue *queue,
249 : struct tevent_context *ev,
250 : struct tevent_req *req,
251 : tevent_queue_trigger_fn_t trigger,
252 : void *private_data)
253 : {
254 59260 : return tevent_queue_add_internal(queue, ev, req,
255 : trigger, private_data, false);
256 : }
257 :
258 80037945 : struct tevent_queue_entry *tevent_queue_add_optimize_empty(
259 : struct tevent_queue *queue,
260 : struct tevent_context *ev,
261 : struct tevent_req *req,
262 : tevent_queue_trigger_fn_t trigger,
263 : void *private_data)
264 : {
265 80037945 : return tevent_queue_add_internal(queue, ev, req,
266 : trigger, private_data, true);
267 : }
268 :
269 54 : void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry)
270 : {
271 54 : if (entry->queue->running) {
272 0 : abort();
273 : }
274 :
275 54 : if (entry->queue->list != entry) {
276 0 : abort();
277 : }
278 :
279 54 : entry->triggered = false;
280 54 : }
281 :
282 64792 : void tevent_queue_start(struct tevent_queue *queue)
283 : {
284 64792 : if (queue->running) {
285 : /* already started */
286 64738 : return;
287 : }
288 :
289 54 : queue->running = true;
290 :
291 54 : if (!queue->list) {
292 0 : return;
293 : }
294 :
295 54 : if (queue->list->triggered) {
296 0 : return;
297 : }
298 :
299 54 : tevent_schedule_immediate(queue->immediate,
300 : queue->list->ev,
301 : tevent_queue_immediate_trigger,
302 : queue);
303 : }
304 :
305 93645 : void tevent_queue_stop(struct tevent_queue *queue)
306 : {
307 93645 : queue->running = false;
308 93645 : }
309 :
310 60862442 : size_t tevent_queue_length(struct tevent_queue *queue)
311 : {
312 60862442 : return queue->length;
313 : }
314 :
315 0 : bool tevent_queue_running(struct tevent_queue *queue)
316 : {
317 0 : return queue->running;
318 : }
319 :
320 : struct tevent_queue_wait_state {
321 : uint8_t dummy;
322 : };
323 :
324 : static void tevent_queue_wait_trigger(struct tevent_req *req,
325 : void *private_data);
326 :
327 377697 : struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx,
328 : struct tevent_context *ev,
329 : struct tevent_queue *queue)
330 : {
331 : struct tevent_req *req;
332 : struct tevent_queue_wait_state *state;
333 : bool ok;
334 :
335 377697 : req = tevent_req_create(mem_ctx, &state,
336 : struct tevent_queue_wait_state);
337 377697 : if (req == NULL) {
338 0 : return NULL;
339 : }
340 :
341 377697 : ok = tevent_queue_add(queue, ev, req,
342 : tevent_queue_wait_trigger,
343 : NULL);
344 377697 : if (!ok) {
345 0 : tevent_req_oom(req);
346 0 : return tevent_req_post(req, ev);
347 : }
348 :
349 370895 : return req;
350 : }
351 :
352 377590 : static void tevent_queue_wait_trigger(struct tevent_req *req,
353 : void *private_data)
354 : {
355 377590 : tevent_req_done(req);
356 377590 : }
357 :
358 377490 : bool tevent_queue_wait_recv(struct tevent_req *req)
359 : {
360 : enum tevent_req_state state;
361 : uint64_t err;
362 :
363 377490 : if (tevent_req_is_error(req, &state, &err)) {
364 0 : tevent_req_received(req);
365 0 : return false;
366 : }
367 :
368 377490 : tevent_req_received(req);
369 377490 : return true;
370 : }
|