Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 :
4 : testing of the events subsystem
5 :
6 : Copyright (C) Stefan Metzmacher 2006-2009
7 : Copyright (C) Jeremy Allison 2013
8 :
9 : ** NOTE! The following LGPL license applies to the tevent
10 : ** library. This does NOT imply that all of Samba is released
11 : ** under the LGPL
12 :
13 : This library is free software; you can redistribute it and/or
14 : modify it under the terms of the GNU Lesser General Public
15 : License as published by the Free Software Foundation; either
16 : version 3 of the License, or (at your option) any later version.
17 :
18 : This library is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 : Lesser General Public License for more details.
22 :
23 : You should have received a copy of the GNU Lesser General Public
24 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 : */
26 :
27 : #include "includes.h"
28 : #define TEVENT_DEPRECATED 1
29 : #include "tevent.h"
30 : #include "system/filesys.h"
31 : #include "system/select.h"
32 : #include "system/network.h"
33 : #include "torture/torture.h"
34 : #include "torture/local/proto.h"
35 : #ifdef HAVE_PTHREAD
36 : #include "system/threads.h"
37 : #include <assert.h>
38 : #endif
39 :
40 : static int fde_count;
41 :
42 273443 : static void do_read(int fd, void *buf, size_t count)
43 : {
44 : ssize_t ret;
45 :
46 : do {
47 273443 : ret = read(fd, buf, count);
48 273443 : } while (ret == -1 && errno == EINTR);
49 273443 : }
50 :
51 273443 : static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
52 : uint16_t flags, void *private_data)
53 : {
54 273443 : int *fd = (int *)private_data;
55 : char c;
56 : #ifdef SA_SIGINFO
57 273443 : kill(getpid(), SIGUSR1);
58 : #endif
59 273443 : kill(getpid(), SIGALRM);
60 :
61 273443 : do_read(fd[0], &c, 1);
62 273443 : fde_count++;
63 273443 : }
64 :
65 273475 : static void do_write(int fd, void *buf, size_t count)
66 : {
67 : ssize_t ret;
68 :
69 : do {
70 273475 : ret = write(fd, buf, count);
71 273475 : } while (ret == -1 && errno == EINTR);
72 273475 : }
73 :
74 273445 : static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
75 : uint16_t flags, void *private_data)
76 : {
77 273445 : int *fd = (int *)private_data;
78 273445 : char c = 0;
79 :
80 273445 : do_write(fd[1], &c, 1);
81 273445 : }
82 :
83 :
84 : /* This will only fire if the fd's returned from pipe() are bi-directional. */
85 0 : static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
86 : uint16_t flags, void *private_data)
87 : {
88 0 : int *fd = (int *)private_data;
89 : char c;
90 : #ifdef SA_SIGINFO
91 0 : kill(getpid(), SIGUSR1);
92 : #endif
93 0 : kill(getpid(), SIGALRM);
94 :
95 0 : do_read(fd[1], &c, 1);
96 0 : fde_count++;
97 0 : }
98 :
99 : /* This will only fire if the fd's returned from pipe() are bi-directional. */
100 0 : static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
101 : uint16_t flags, void *private_data)
102 : {
103 0 : int *fd = (int *)private_data;
104 0 : char c = 0;
105 0 : do_write(fd[0], &c, 1);
106 0 : }
107 :
108 4 : static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
109 : struct timeval tval, void *private_data)
110 : {
111 4 : int *finished = (int *)private_data;
112 4 : (*finished) = 1;
113 4 : }
114 :
115 546890 : static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
116 : int signum, int count, void *info, void *private_data)
117 : {
118 546890 : int *countp = (int *)private_data;
119 546890 : (*countp) += count;
120 546890 : }
121 :
122 4 : static bool test_event_context(struct torture_context *test,
123 : const void *test_data)
124 : {
125 : struct tevent_context *ev_ctx;
126 4 : int fd[2] = { -1, -1 };
127 4 : const char *backend = (const char *)test_data;
128 4 : int alarm_count=0, info_count=0;
129 : struct tevent_fd *fde_read;
130 : struct tevent_fd *fde_read_1;
131 : struct tevent_fd *fde_write;
132 : struct tevent_fd *fde_write_1;
133 : #ifdef SA_RESTART
134 4 : struct tevent_signal *se1 = NULL;
135 : #endif
136 : #ifdef SA_RESETHAND
137 4 : struct tevent_signal *se2 = NULL;
138 : #endif
139 : #ifdef SA_SIGINFO
140 4 : struct tevent_signal *se3 = NULL;
141 : #endif
142 4 : int finished=0;
143 : struct timeval t;
144 : int ret;
145 :
146 4 : ev_ctx = tevent_context_init_byname(test, backend);
147 4 : if (ev_ctx == NULL) {
148 0 : torture_comment(test, "event backend '%s' not supported\n", backend);
149 0 : return true;
150 : }
151 :
152 4 : torture_comment(test, "backend '%s' - %s\n",
153 : backend, __FUNCTION__);
154 :
155 : /* reset globals */
156 4 : fde_count = 0;
157 :
158 : /* create a pipe */
159 4 : ret = pipe(fd);
160 4 : torture_assert_int_equal(test, ret, 0, "pipe failed");
161 :
162 4 : fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
163 : fde_handler_read, fd);
164 4 : fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
165 : fde_handler_write_1, fd);
166 :
167 4 : fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
168 : fde_handler_write, fd);
169 4 : fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
170 : fde_handler_read_1, fd);
171 :
172 4 : tevent_fd_set_auto_close(fde_read);
173 4 : tevent_fd_set_auto_close(fde_write);
174 :
175 4 : tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
176 : finished_handler, &finished);
177 :
178 : #ifdef SA_RESTART
179 4 : se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
180 4 : torture_assert(test, se1 != NULL, "failed to setup se1");
181 : #endif
182 : #ifdef SA_RESETHAND
183 4 : se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
184 4 : torture_assert(test, se2 != NULL, "failed to setup se2");
185 : #endif
186 : #ifdef SA_SIGINFO
187 4 : se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
188 4 : torture_assert(test, se3 != NULL, "failed to setup se3");
189 : #endif
190 :
191 4 : t = timeval_current();
192 4 : while (!finished) {
193 1093773 : errno = 0;
194 1093773 : if (tevent_loop_once(ev_ctx) == -1) {
195 0 : TALLOC_FREE(ev_ctx);
196 0 : torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
197 : return false;
198 : }
199 : }
200 :
201 4 : talloc_free(fde_read_1);
202 4 : talloc_free(fde_write_1);
203 4 : talloc_free(fde_read);
204 4 : talloc_free(fde_write);
205 :
206 8 : while (alarm_count < fde_count+1) {
207 0 : if (tevent_loop_once(ev_ctx) == -1) {
208 0 : break;
209 : }
210 : }
211 :
212 4 : torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
213 :
214 : #ifdef SA_RESTART
215 4 : talloc_free(se1);
216 : #endif
217 :
218 4 : torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
219 :
220 : #ifdef SA_RESETHAND
221 : /*
222 : * we do not call talloc_free(se2)
223 : * because it is already gone,
224 : * after triggering the event handler.
225 : */
226 : #endif
227 :
228 : #ifdef SA_SIGINFO
229 4 : talloc_free(se3);
230 4 : torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
231 : #endif
232 :
233 4 : talloc_free(ev_ctx);
234 :
235 4 : return true;
236 : }
237 :
238 : struct test_event_fd1_state {
239 : struct torture_context *tctx;
240 : const char *backend;
241 : struct tevent_context *ev;
242 : int sock[2];
243 : struct tevent_timer *te;
244 : struct tevent_fd *fde0;
245 : struct tevent_fd *fde1;
246 : bool got_write;
247 : bool got_read;
248 : bool drain;
249 : bool drain_done;
250 : unsigned loop_count;
251 : bool finished;
252 : const char *error;
253 : };
254 :
255 40 : static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
256 : struct tevent_fd *fde,
257 : uint16_t flags,
258 : void *private_data)
259 : {
260 40 : struct test_event_fd1_state *state =
261 : (struct test_event_fd1_state *)private_data;
262 :
263 40 : if (state->drain_done) {
264 0 : state->finished = true;
265 0 : state->error = __location__;
266 0 : return;
267 : }
268 :
269 40 : if (state->drain) {
270 : ssize_t ret;
271 16 : uint8_t c = 0;
272 :
273 16 : if (!(flags & TEVENT_FD_READ)) {
274 0 : state->finished = true;
275 0 : state->error = __location__;
276 0 : return;
277 : }
278 :
279 32 : ret = read(state->sock[0], &c, 1);
280 16 : if (ret == 1) {
281 0 : return;
282 : }
283 :
284 : /*
285 : * end of test...
286 : */
287 4 : tevent_fd_set_flags(fde, 0);
288 4 : state->drain_done = true;
289 4 : return;
290 : }
291 :
292 24 : if (!state->got_write) {
293 12 : uint8_t c = 0;
294 :
295 12 : if (flags != TEVENT_FD_WRITE) {
296 0 : state->finished = true;
297 0 : state->error = __location__;
298 0 : return;
299 : }
300 12 : state->got_write = true;
301 :
302 : /*
303 : * we write to the other socket...
304 : */
305 12 : do_write(state->sock[1], &c, 1);
306 12 : TEVENT_FD_NOT_WRITEABLE(fde);
307 12 : TEVENT_FD_READABLE(fde);
308 12 : return;
309 : }
310 :
311 12 : if (!state->got_read) {
312 12 : if (flags != TEVENT_FD_READ) {
313 0 : state->finished = true;
314 0 : state->error = __location__;
315 0 : return;
316 : }
317 12 : state->got_read = true;
318 :
319 12 : TEVENT_FD_NOT_READABLE(fde);
320 12 : return;
321 : }
322 :
323 0 : state->finished = true;
324 0 : state->error = __location__;
325 0 : return;
326 : }
327 :
328 16 : static void test_event_fd1_finished(struct tevent_context *ev_ctx,
329 : struct tevent_timer *te,
330 : struct timeval tval,
331 : void *private_data)
332 : {
333 16 : struct test_event_fd1_state *state =
334 : (struct test_event_fd1_state *)private_data;
335 :
336 16 : if (state->drain_done) {
337 4 : state->finished = true;
338 4 : return;
339 : }
340 :
341 12 : if (!state->got_write) {
342 0 : state->finished = true;
343 0 : state->error = __location__;
344 0 : return;
345 : }
346 :
347 12 : if (!state->got_read) {
348 0 : state->finished = true;
349 0 : state->error = __location__;
350 0 : return;
351 : }
352 :
353 12 : state->loop_count++;
354 12 : if (state->loop_count > 3) {
355 0 : state->finished = true;
356 0 : state->error = __location__;
357 0 : return;
358 : }
359 :
360 12 : state->got_write = false;
361 12 : state->got_read = false;
362 :
363 12 : tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
364 :
365 12 : if (state->loop_count > 2) {
366 4 : state->drain = true;
367 4 : TALLOC_FREE(state->fde1);
368 4 : TEVENT_FD_READABLE(state->fde0);
369 : }
370 :
371 12 : state->te = tevent_add_timer(state->ev, state->ev,
372 : timeval_current_ofs(0,2000),
373 : test_event_fd1_finished, state);
374 : }
375 :
376 4 : static bool test_event_fd1(struct torture_context *tctx,
377 : const void *test_data)
378 : {
379 : struct test_event_fd1_state state;
380 : int ret;
381 :
382 4 : ZERO_STRUCT(state);
383 4 : state.tctx = tctx;
384 4 : state.backend = (const char *)test_data;
385 :
386 4 : state.ev = tevent_context_init_byname(tctx, state.backend);
387 4 : if (state.ev == NULL) {
388 0 : torture_skip(tctx, talloc_asprintf(tctx,
389 : "event backend '%s' not supported\n",
390 : state.backend));
391 : return true;
392 : }
393 :
394 4 : tevent_set_debug_stderr(state.ev);
395 4 : torture_comment(tctx, "backend '%s' - %s\n",
396 : state.backend, __FUNCTION__);
397 :
398 : /*
399 : * This tests the following:
400 : *
401 : * It monitors the state of state.sock[0]
402 : * with tevent_fd, but we never read/write on state.sock[0]
403 : * while state.sock[1] * is only used to write a few bytes.
404 : *
405 : * We have a loop:
406 : * - we wait only for TEVENT_FD_WRITE on state.sock[0]
407 : * - we write 1 byte to state.sock[1]
408 : * - we wait only for TEVENT_FD_READ on state.sock[0]
409 : * - we disable events on state.sock[0]
410 : * - the timer event restarts the loop
411 : * Then we close state.sock[1]
412 : * We have a loop:
413 : * - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
414 : * - we try to read 1 byte
415 : * - if the read gets an error of returns 0
416 : * we disable the event handler
417 : * - the timer finishes the test
418 : */
419 4 : state.sock[0] = -1;
420 4 : state.sock[1] = -1;
421 :
422 4 : ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
423 4 : torture_assert(tctx, ret == 0, "socketpair() failed");
424 :
425 4 : state.te = tevent_add_timer(state.ev, state.ev,
426 : timeval_current_ofs(0,10000),
427 : test_event_fd1_finished, &state);
428 4 : state.fde0 = tevent_add_fd(state.ev, state.ev,
429 : state.sock[0], TEVENT_FD_WRITE,
430 : test_event_fd1_fde_handler, &state);
431 : /* state.fde1 is only used to auto close */
432 4 : state.fde1 = tevent_add_fd(state.ev, state.ev,
433 : state.sock[1], 0,
434 : test_event_fd1_fde_handler, &state);
435 :
436 4 : tevent_fd_set_auto_close(state.fde0);
437 4 : tevent_fd_set_auto_close(state.fde1);
438 :
439 4 : while (!state.finished) {
440 64 : errno = 0;
441 64 : if (tevent_loop_once(state.ev) == -1) {
442 0 : talloc_free(state.ev);
443 0 : torture_fail(tctx, talloc_asprintf(tctx,
444 : "Failed event loop %s\n",
445 : strerror(errno)));
446 : }
447 : }
448 :
449 4 : talloc_free(state.ev);
450 :
451 4 : torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
452 : "%s", state.error));
453 :
454 0 : return true;
455 : }
456 :
457 : struct test_event_fd2_state {
458 : struct torture_context *tctx;
459 : const char *backend;
460 : struct tevent_context *ev;
461 : struct tevent_timer *te;
462 : struct test_event_fd2_sock {
463 : struct test_event_fd2_state *state;
464 : int fd;
465 : struct tevent_fd *fde;
466 : size_t num_written;
467 : size_t num_read;
468 : bool got_full;
469 : } sock0, sock1;
470 : bool finished;
471 : const char *error;
472 : };
473 :
474 1116 : static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
475 : struct tevent_fd *fde,
476 : uint16_t flags,
477 : void *private_data)
478 : {
479 1116 : struct test_event_fd2_sock *cur_sock =
480 : (struct test_event_fd2_sock *)private_data;
481 1116 : struct test_event_fd2_state *state = cur_sock->state;
482 1116 : struct test_event_fd2_sock *oth_sock = NULL;
483 1116 : uint8_t v = 0, c;
484 : ssize_t ret;
485 :
486 1116 : if (cur_sock == &state->sock0) {
487 558 : oth_sock = &state->sock1;
488 : } else {
489 0 : oth_sock = &state->sock0;
490 : }
491 :
492 1116 : if (oth_sock->num_written == 1) {
493 4 : if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
494 0 : state->finished = true;
495 0 : state->error = __location__;
496 0 : return;
497 : }
498 : }
499 :
500 1116 : if (cur_sock->num_read == oth_sock->num_written) {
501 0 : state->finished = true;
502 0 : state->error = __location__;
503 0 : return;
504 : }
505 :
506 1116 : if (!(flags & TEVENT_FD_READ)) {
507 0 : state->finished = true;
508 0 : state->error = __location__;
509 0 : return;
510 : }
511 :
512 1116 : if (oth_sock->num_read >= PIPE_BUF) {
513 : /*
514 : * On Linux we become writable once we've read
515 : * one byte. On Solaris we only become writable
516 : * again once we've read 4096 bytes. PIPE_BUF
517 : * is probably a safe bet to test against.
518 : *
519 : * There should be room to write a byte again
520 : */
521 0 : if (!(flags & TEVENT_FD_WRITE)) {
522 0 : state->finished = true;
523 0 : state->error = __location__;
524 0 : return;
525 : }
526 : }
527 :
528 1116 : if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
529 552 : v = (uint8_t)cur_sock->num_written;
530 552 : ret = write(cur_sock->fd, &v, 1);
531 552 : if (ret != 1) {
532 0 : state->finished = true;
533 0 : state->error = __location__;
534 0 : return;
535 : }
536 552 : cur_sock->num_written++;
537 552 : if (cur_sock->num_written > 0x80000000) {
538 0 : state->finished = true;
539 0 : state->error = __location__;
540 0 : return;
541 : }
542 0 : return;
543 : }
544 :
545 564 : if (!cur_sock->got_full) {
546 8 : cur_sock->got_full = true;
547 :
548 8 : if (!oth_sock->got_full) {
549 : /*
550 : * cur_sock is full,
551 : * lets wait for oth_sock
552 : * to be filled
553 : */
554 4 : tevent_fd_set_flags(cur_sock->fde, 0);
555 4 : return;
556 : }
557 :
558 : /*
559 : * oth_sock waited for cur_sock,
560 : * lets restart it
561 : */
562 4 : tevent_fd_set_flags(oth_sock->fde,
563 : TEVENT_FD_READ|TEVENT_FD_WRITE);
564 : }
565 :
566 1120 : ret = read(cur_sock->fd, &v, 1);
567 560 : if (ret != 1) {
568 0 : state->finished = true;
569 0 : state->error = __location__;
570 0 : return;
571 : }
572 560 : c = (uint8_t)cur_sock->num_read;
573 560 : if (c != v) {
574 0 : state->finished = true;
575 0 : state->error = __location__;
576 0 : return;
577 : }
578 560 : cur_sock->num_read++;
579 :
580 560 : if (cur_sock->num_read < oth_sock->num_written) {
581 : /* there is more to read */
582 0 : return;
583 : }
584 : /*
585 : * we read everything, we need to remove TEVENT_FD_WRITE
586 : * to avoid spinning
587 : */
588 8 : TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
589 :
590 8 : if (oth_sock->num_read == cur_sock->num_written) {
591 : /*
592 : * both directions are finished
593 : */
594 4 : state->finished = true;
595 : }
596 :
597 0 : return;
598 : }
599 :
600 0 : static void test_event_fd2_finished(struct tevent_context *ev_ctx,
601 : struct tevent_timer *te,
602 : struct timeval tval,
603 : void *private_data)
604 : {
605 0 : struct test_event_fd2_state *state =
606 : (struct test_event_fd2_state *)private_data;
607 :
608 : /*
609 : * this should never be triggered
610 : */
611 0 : state->finished = true;
612 0 : state->error = __location__;
613 0 : }
614 :
615 4 : static bool test_event_fd2(struct torture_context *tctx,
616 : const void *test_data)
617 : {
618 : struct test_event_fd2_state state;
619 : int sock[2];
620 4 : uint8_t c = 0;
621 :
622 4 : ZERO_STRUCT(state);
623 4 : state.tctx = tctx;
624 4 : state.backend = (const char *)test_data;
625 :
626 4 : state.ev = tevent_context_init_byname(tctx, state.backend);
627 4 : if (state.ev == NULL) {
628 0 : torture_skip(tctx, talloc_asprintf(tctx,
629 : "event backend '%s' not supported\n",
630 : state.backend));
631 : return true;
632 : }
633 :
634 4 : tevent_set_debug_stderr(state.ev);
635 4 : torture_comment(tctx, "backend '%s' - %s\n",
636 : state.backend, __FUNCTION__);
637 :
638 : /*
639 : * This tests the following
640 : *
641 : * - We write 1 byte to each socket
642 : * - We wait for TEVENT_FD_READ/WRITE on both sockets
643 : * - When we get TEVENT_FD_WRITE we write 1 byte
644 : * until both socket buffers are full, which
645 : * means both sockets only get TEVENT_FD_READ.
646 : * - Then we read 1 byte until we have consumed
647 : * all bytes the other end has written.
648 : */
649 4 : sock[0] = -1;
650 4 : sock[1] = -1;
651 4 : socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
652 :
653 : /*
654 : * the timer should never expire
655 : */
656 4 : state.te = tevent_add_timer(state.ev, state.ev,
657 : timeval_current_ofs(600, 0),
658 : test_event_fd2_finished, &state);
659 4 : state.sock0.state = &state;
660 4 : state.sock0.fd = sock[0];
661 4 : state.sock0.fde = tevent_add_fd(state.ev, state.ev,
662 : state.sock0.fd,
663 : TEVENT_FD_READ | TEVENT_FD_WRITE,
664 : test_event_fd2_sock_handler,
665 : &state.sock0);
666 4 : state.sock1.state = &state;
667 4 : state.sock1.fd = sock[1];
668 4 : state.sock1.fde = tevent_add_fd(state.ev, state.ev,
669 : state.sock1.fd,
670 : TEVENT_FD_READ | TEVENT_FD_WRITE,
671 : test_event_fd2_sock_handler,
672 : &state.sock1);
673 :
674 4 : tevent_fd_set_auto_close(state.sock0.fde);
675 4 : tevent_fd_set_auto_close(state.sock1.fde);
676 :
677 4 : do_write(state.sock0.fd, &c, 1);
678 4 : state.sock0.num_written++;
679 4 : do_write(state.sock1.fd, &c, 1);
680 4 : state.sock1.num_written++;
681 :
682 1128 : while (!state.finished) {
683 1120 : errno = 0;
684 1120 : if (tevent_loop_once(state.ev) == -1) {
685 0 : talloc_free(state.ev);
686 0 : torture_fail(tctx, talloc_asprintf(tctx,
687 : "Failed event loop %s\n",
688 : strerror(errno)));
689 : }
690 : }
691 :
692 4 : talloc_free(state.ev);
693 :
694 4 : torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
695 : "%s", state.error));
696 :
697 0 : return true;
698 : }
699 :
700 : struct test_wrapper_state {
701 : struct torture_context *tctx;
702 : int num_events;
703 : int num_wrap_handlers;
704 : };
705 :
706 4 : static bool test_wrapper_before_use(struct tevent_context *wrap_ev,
707 : void *private_data,
708 : struct tevent_context *main_ev,
709 : const char *location)
710 : {
711 4 : struct test_wrapper_state *state =
712 0 : talloc_get_type_abort(private_data,
713 : struct test_wrapper_state);
714 :
715 4 : torture_comment(state->tctx, "%s\n", __func__);
716 4 : state->num_wrap_handlers++;
717 4 : return true;
718 : }
719 :
720 4 : static void test_wrapper_after_use(struct tevent_context *wrap_ev,
721 : void *private_data,
722 : struct tevent_context *main_ev,
723 : const char *location)
724 : {
725 4 : struct test_wrapper_state *state =
726 0 : talloc_get_type_abort(private_data,
727 : struct test_wrapper_state);
728 :
729 4 : torture_comment(state->tctx, "%s\n", __func__);
730 4 : state->num_wrap_handlers++;
731 4 : }
732 :
733 4 : static void test_wrapper_before_fd_handler(struct tevent_context *wrap_ev,
734 : void *private_data,
735 : struct tevent_context *main_ev,
736 : struct tevent_fd *fde,
737 : uint16_t flags,
738 : const char *handler_name,
739 : const char *location)
740 : {
741 4 : struct test_wrapper_state *state =
742 0 : talloc_get_type_abort(private_data,
743 : struct test_wrapper_state);
744 :
745 4 : torture_comment(state->tctx, "%s\n", __func__);
746 4 : state->num_wrap_handlers++;
747 4 : }
748 :
749 4 : static void test_wrapper_after_fd_handler(struct tevent_context *wrap_ev,
750 : void *private_data,
751 : struct tevent_context *main_ev,
752 : struct tevent_fd *fde,
753 : uint16_t flags,
754 : const char *handler_name,
755 : const char *location)
756 : {
757 4 : struct test_wrapper_state *state =
758 0 : talloc_get_type_abort(private_data,
759 : struct test_wrapper_state);
760 :
761 4 : torture_comment(state->tctx, "%s\n", __func__);
762 4 : state->num_wrap_handlers++;
763 4 : }
764 :
765 4 : static void test_wrapper_before_timer_handler(struct tevent_context *wrap_ev,
766 : void *private_data,
767 : struct tevent_context *main_ev,
768 : struct tevent_timer *te,
769 : struct timeval requested_time,
770 : struct timeval trigger_time,
771 : const char *handler_name,
772 : const char *location)
773 : {
774 4 : struct test_wrapper_state *state =
775 0 : talloc_get_type_abort(private_data,
776 : struct test_wrapper_state);
777 :
778 4 : torture_comment(state->tctx, "%s\n", __func__);
779 4 : state->num_wrap_handlers++;
780 4 : }
781 :
782 4 : static void test_wrapper_after_timer_handler(struct tevent_context *wrap_ev,
783 : void *private_data,
784 : struct tevent_context *main_ev,
785 : struct tevent_timer *te,
786 : struct timeval requested_time,
787 : struct timeval trigger_time,
788 : const char *handler_name,
789 : const char *location)
790 : {
791 4 : struct test_wrapper_state *state =
792 0 : talloc_get_type_abort(private_data,
793 : struct test_wrapper_state);
794 :
795 4 : torture_comment(state->tctx, "%s\n", __func__);
796 4 : state->num_wrap_handlers++;
797 4 : }
798 :
799 4 : static void test_wrapper_before_immediate_handler(struct tevent_context *wrap_ev,
800 : void *private_data,
801 : struct tevent_context *main_ev,
802 : struct tevent_immediate *im,
803 : const char *handler_name,
804 : const char *location)
805 : {
806 4 : struct test_wrapper_state *state =
807 0 : talloc_get_type_abort(private_data,
808 : struct test_wrapper_state);
809 :
810 4 : torture_comment(state->tctx, "%s\n", __func__);
811 4 : state->num_wrap_handlers++;
812 4 : }
813 :
814 4 : static void test_wrapper_after_immediate_handler(struct tevent_context *wrap_ev,
815 : void *private_data,
816 : struct tevent_context *main_ev,
817 : struct tevent_immediate *im,
818 : const char *handler_name,
819 : const char *location)
820 : {
821 4 : struct test_wrapper_state *state =
822 0 : talloc_get_type_abort(private_data,
823 : struct test_wrapper_state);
824 :
825 4 : torture_comment(state->tctx, "%s\n", __func__);
826 4 : state->num_wrap_handlers++;
827 4 : }
828 :
829 8 : static void test_wrapper_before_signal_handler(struct tevent_context *wrap_ev,
830 : void *private_data,
831 : struct tevent_context *main_ev,
832 : struct tevent_signal *se,
833 : int signum,
834 : int count,
835 : void *siginfo,
836 : const char *handler_name,
837 : const char *location)
838 : {
839 8 : struct test_wrapper_state *state =
840 0 : talloc_get_type_abort(private_data,
841 : struct test_wrapper_state);
842 :
843 8 : torture_comment(state->tctx, "%s\n", __func__);
844 8 : state->num_wrap_handlers++;
845 8 : }
846 :
847 8 : static void test_wrapper_after_signal_handler(struct tevent_context *wrap_ev,
848 : void *private_data,
849 : struct tevent_context *main_ev,
850 : struct tevent_signal *se,
851 : int signum,
852 : int count,
853 : void *siginfo,
854 : const char *handler_name,
855 : const char *location)
856 : {
857 8 : struct test_wrapper_state *state =
858 0 : talloc_get_type_abort(private_data,
859 : struct test_wrapper_state);
860 :
861 8 : torture_comment(state->tctx, "%s\n", __func__);
862 8 : state->num_wrap_handlers++;
863 8 : }
864 :
865 : static const struct tevent_wrapper_ops test_wrapper_ops = {
866 : .name = "test_wrapper",
867 : .before_use = test_wrapper_before_use,
868 : .after_use = test_wrapper_after_use,
869 : .before_fd_handler = test_wrapper_before_fd_handler,
870 : .after_fd_handler = test_wrapper_after_fd_handler,
871 : .before_timer_handler = test_wrapper_before_timer_handler,
872 : .after_timer_handler = test_wrapper_after_timer_handler,
873 : .before_immediate_handler = test_wrapper_before_immediate_handler,
874 : .after_immediate_handler = test_wrapper_after_immediate_handler,
875 : .before_signal_handler = test_wrapper_before_signal_handler,
876 : .after_signal_handler = test_wrapper_after_signal_handler,
877 : };
878 :
879 4 : static void test_wrapper_timer_handler(struct tevent_context *ev,
880 : struct tevent_timer *te,
881 : struct timeval tv,
882 : void *private_data)
883 : {
884 4 : struct test_wrapper_state *state =
885 : (struct test_wrapper_state *)private_data;
886 :
887 :
888 4 : torture_comment(state->tctx, "timer handler\n");
889 :
890 4 : state->num_events++;
891 4 : talloc_free(te);
892 4 : return;
893 : }
894 :
895 4 : static void test_wrapper_fd_handler(struct tevent_context *ev,
896 : struct tevent_fd *fde,
897 : unsigned short fd_flags,
898 : void *private_data)
899 : {
900 4 : struct test_wrapper_state *state =
901 : (struct test_wrapper_state *)private_data;
902 :
903 4 : torture_comment(state->tctx, "fd handler\n");
904 :
905 4 : state->num_events++;
906 4 : talloc_free(fde);
907 4 : return;
908 : }
909 :
910 4 : static void test_wrapper_immediate_handler(struct tevent_context *ev,
911 : struct tevent_immediate *im,
912 : void *private_data)
913 : {
914 4 : struct test_wrapper_state *state =
915 : (struct test_wrapper_state *)private_data;
916 :
917 4 : state->num_events++;
918 4 : talloc_free(im);
919 :
920 4 : torture_comment(state->tctx, "immediate handler\n");
921 4 : return;
922 : }
923 :
924 4 : static void test_wrapper_signal_handler(struct tevent_context *ev,
925 : struct tevent_signal *se,
926 : int signum,
927 : int count,
928 : void *siginfo,
929 : void *private_data)
930 : {
931 4 : struct test_wrapper_state *state =
932 : (struct test_wrapper_state *)private_data;
933 :
934 4 : torture_comment(state->tctx, "signal handler\n");
935 :
936 4 : state->num_events++;
937 4 : talloc_free(se);
938 4 : return;
939 : }
940 :
941 4 : static bool test_wrapper(struct torture_context *tctx,
942 : const void *test_data)
943 : {
944 4 : struct test_wrapper_state *state = NULL;
945 4 : int sock[2] = { -1, -1};
946 4 : uint8_t c = 0;
947 4 : const int num_events = 4;
948 4 : const char *backend = (const char *)test_data;
949 4 : struct tevent_context *ev = NULL;
950 4 : struct tevent_context *wrap_ev = NULL;
951 4 : struct tevent_fd *fde = NULL;
952 4 : struct tevent_timer *te = NULL;
953 4 : struct tevent_signal *se = NULL;
954 4 : struct tevent_immediate *im = NULL;
955 : int ret;
956 4 : bool ok = false;
957 : bool ret2;
958 :
959 4 : ev = tevent_context_init_byname(tctx, backend);
960 4 : if (ev == NULL) {
961 0 : torture_skip(tctx, talloc_asprintf(tctx,
962 : "event backend '%s' not supported\n",
963 : backend));
964 : return true;
965 : }
966 :
967 4 : tevent_set_debug_stderr(ev);
968 4 : torture_comment(tctx, "tevent backend '%s'\n", backend);
969 :
970 4 : wrap_ev = tevent_context_wrapper_create(
971 : ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
972 4 : torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
973 : "tevent_context_wrapper_create failed\n");
974 4 : *state = (struct test_wrapper_state) {
975 : .tctx = tctx,
976 : };
977 :
978 4 : ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
979 4 : torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
980 :
981 4 : te = tevent_add_timer(wrap_ev, wrap_ev,
982 : timeval_current_ofs(0, 0),
983 : test_wrapper_timer_handler, state);
984 4 : torture_assert_not_null_goto(tctx, te, ok, done,
985 : "tevent_add_timer failed\n");
986 :
987 4 : fde = tevent_add_fd(wrap_ev, wrap_ev,
988 : sock[1],
989 : TEVENT_FD_READ,
990 : test_wrapper_fd_handler,
991 : state);
992 4 : torture_assert_not_null_goto(tctx, fde, ok, done,
993 : "tevent_add_fd failed\n");
994 :
995 4 : im = tevent_create_immediate(wrap_ev);
996 4 : torture_assert_not_null_goto(tctx, im, ok, done,
997 : "tevent_create_immediate failed\n");
998 :
999 4 : se = tevent_add_signal(wrap_ev, wrap_ev,
1000 : SIGUSR1,
1001 : 0,
1002 : test_wrapper_signal_handler,
1003 : state);
1004 4 : torture_assert_not_null_goto(tctx, se, ok, done,
1005 : "tevent_add_signal failed\n");
1006 :
1007 4 : do_write(sock[0], &c, 1);
1008 4 : kill(getpid(), SIGUSR1);
1009 4 : tevent_schedule_immediate(im,
1010 : wrap_ev,
1011 : test_wrapper_immediate_handler,
1012 : state);
1013 :
1014 4 : ret2 = tevent_context_push_use(wrap_ev);
1015 4 : torture_assert_goto(tctx, ret2, ok, done, "tevent_context_push_use(wrap_ev) failed\n");
1016 4 : ret2 = tevent_context_push_use(ev);
1017 4 : torture_assert_goto(tctx, ret2, ok, pop_use, "tevent_context_push_use(ev) failed\n");
1018 4 : tevent_context_pop_use(ev);
1019 4 : tevent_context_pop_use(wrap_ev);
1020 :
1021 4 : ret = tevent_loop_wait(ev);
1022 4 : torture_assert_int_equal_goto(tctx, ret, 0, ok, done, "tevent_loop_wait failed\n");
1023 :
1024 4 : torture_comment(tctx, "Num events: %d\n", state->num_events);
1025 4 : torture_comment(tctx, "Num wrap handlers: %d\n",
1026 4 : state->num_wrap_handlers);
1027 :
1028 4 : torture_assert_int_equal_goto(tctx, state->num_events, num_events, ok, done,
1029 : "Wrong event count\n");
1030 4 : torture_assert_int_equal_goto(tctx, state->num_wrap_handlers,
1031 : num_events*2+2,
1032 : ok, done, "Wrong wrapper count\n");
1033 :
1034 0 : ok = true;
1035 :
1036 4 : done:
1037 4 : TALLOC_FREE(wrap_ev);
1038 4 : TALLOC_FREE(ev);
1039 :
1040 4 : if (sock[0] != -1) {
1041 4 : close(sock[0]);
1042 : }
1043 4 : if (sock[1] != -1) {
1044 4 : close(sock[1]);
1045 : }
1046 0 : return ok;
1047 0 : pop_use:
1048 0 : tevent_context_pop_use(wrap_ev);
1049 0 : goto done;
1050 : }
1051 :
1052 4 : static void test_free_wrapper_signal_handler(struct tevent_context *ev,
1053 : struct tevent_signal *se,
1054 : int signum,
1055 : int count,
1056 : void *siginfo,
1057 : void *private_data)
1058 : {
1059 4 : struct torture_context *tctx =
1060 0 : talloc_get_type_abort(private_data,
1061 : struct torture_context);
1062 :
1063 4 : torture_comment(tctx, "signal handler\n");
1064 :
1065 4 : talloc_free(se);
1066 :
1067 : /*
1068 : * signal handlers have highest priority in tevent, so this signal
1069 : * handler will always be started before the other handlers
1070 : * below. Freeing the (wrapper) event context here tests that the
1071 : * wrapper implementation correclty handles the wrapper ev going away
1072 : * with pending events.
1073 : */
1074 4 : talloc_free(ev);
1075 4 : return;
1076 : }
1077 :
1078 0 : static void test_free_wrapper_fd_handler(struct tevent_context *ev,
1079 : struct tevent_fd *fde,
1080 : unsigned short fd_flags,
1081 : void *private_data)
1082 : {
1083 : /*
1084 : * This should never be called as
1085 : * test_free_wrapper_signal_handler()
1086 : * already destroyed the wrapper tevent_context.
1087 : */
1088 0 : abort();
1089 : }
1090 :
1091 0 : static void test_free_wrapper_immediate_handler(struct tevent_context *ev,
1092 : struct tevent_immediate *im,
1093 : void *private_data)
1094 : {
1095 : /*
1096 : * This should never be called as
1097 : * test_free_wrapper_signal_handler()
1098 : * already destroyed the wrapper tevent_context.
1099 : */
1100 0 : abort();
1101 : }
1102 :
1103 0 : static void test_free_wrapper_timer_handler(struct tevent_context *ev,
1104 : struct tevent_timer *te,
1105 : struct timeval tv,
1106 : void *private_data)
1107 : {
1108 : /*
1109 : * This should never be called as
1110 : * test_free_wrapper_signal_handler()
1111 : * already destroyed the wrapper tevent_context.
1112 : */
1113 0 : abort();
1114 : }
1115 :
1116 4 : static bool test_free_wrapper(struct torture_context *tctx,
1117 : const void *test_data)
1118 : {
1119 4 : struct test_wrapper_state *state = NULL;
1120 4 : int sock[2] = { -1, -1};
1121 4 : uint8_t c = 0;
1122 4 : const char *backend = (const char *)test_data;
1123 4 : TALLOC_CTX *frame = talloc_stackframe();
1124 4 : struct tevent_context *ev = NULL;
1125 4 : struct tevent_context *wrap_ev = NULL;
1126 4 : struct tevent_fd *fde = NULL;
1127 4 : struct tevent_timer *te = NULL;
1128 4 : struct tevent_signal *se = NULL;
1129 4 : struct tevent_immediate *im = NULL;
1130 : int ret;
1131 4 : bool ok = false;
1132 :
1133 4 : ev = tevent_context_init_byname(frame, backend);
1134 4 : if (ev == NULL) {
1135 0 : torture_skip(tctx, talloc_asprintf(tctx,
1136 : "event backend '%s' not supported\n",
1137 : backend));
1138 : return true;
1139 : }
1140 :
1141 4 : tevent_set_debug_stderr(ev);
1142 4 : torture_comment(tctx, "tevent backend '%s'\n", backend);
1143 :
1144 4 : wrap_ev = tevent_context_wrapper_create(
1145 : ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
1146 4 : torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
1147 : "tevent_context_wrapper_create failed\n");
1148 4 : *state = (struct test_wrapper_state) {
1149 : .tctx = tctx,
1150 : };
1151 :
1152 4 : ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1153 4 : torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1154 :
1155 4 : fde = tevent_add_fd(wrap_ev, frame,
1156 : sock[1],
1157 : TEVENT_FD_READ,
1158 : test_free_wrapper_fd_handler,
1159 : NULL);
1160 4 : torture_assert_not_null_goto(tctx, fde, ok, done,
1161 : "tevent_add_fd failed\n");
1162 :
1163 4 : te = tevent_add_timer(wrap_ev, frame,
1164 : timeval_current_ofs(0, 0),
1165 : test_free_wrapper_timer_handler, NULL);
1166 4 : torture_assert_not_null_goto(tctx, te, ok, done,
1167 : "tevent_add_timer failed\n");
1168 :
1169 4 : im = tevent_create_immediate(frame);
1170 4 : torture_assert_not_null_goto(tctx, im, ok, done,
1171 : "tevent_create_immediate failed\n");
1172 :
1173 4 : se = tevent_add_signal(wrap_ev, frame,
1174 : SIGUSR1,
1175 : 0,
1176 : test_free_wrapper_signal_handler,
1177 : tctx);
1178 4 : torture_assert_not_null_goto(tctx, se, ok, done,
1179 : "tevent_add_signal failed\n");
1180 :
1181 4 : do_write(sock[0], &c, 1);
1182 4 : kill(getpid(), SIGUSR1);
1183 4 : tevent_schedule_immediate(im,
1184 : wrap_ev,
1185 : test_free_wrapper_immediate_handler,
1186 : NULL);
1187 :
1188 4 : ret = tevent_loop_wait(ev);
1189 4 : torture_assert_goto(tctx, ret == 0, ok, done, "tevent_loop_wait failed\n");
1190 :
1191 0 : ok = true;
1192 :
1193 4 : done:
1194 4 : TALLOC_FREE(frame);
1195 :
1196 4 : if (sock[0] != -1) {
1197 4 : close(sock[0]);
1198 : }
1199 4 : if (sock[1] != -1) {
1200 4 : close(sock[1]);
1201 : }
1202 0 : return ok;
1203 : }
1204 :
1205 : #ifdef HAVE_PTHREAD
1206 :
1207 : static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
1208 : static bool do_shutdown = false;
1209 :
1210 6 : static void test_event_threaded_lock(void)
1211 : {
1212 : int ret;
1213 6 : ret = pthread_mutex_lock(&threaded_mutex);
1214 6 : assert(ret == 0);
1215 6 : }
1216 :
1217 6 : static void test_event_threaded_unlock(void)
1218 : {
1219 : int ret;
1220 6 : ret = pthread_mutex_unlock(&threaded_mutex);
1221 6 : assert(ret == 0);
1222 6 : }
1223 :
1224 12 : static void test_event_threaded_trace(enum tevent_trace_point point,
1225 : void *private_data)
1226 : {
1227 12 : switch (point) {
1228 3 : case TEVENT_TRACE_BEFORE_WAIT:
1229 3 : test_event_threaded_unlock();
1230 3 : break;
1231 3 : case TEVENT_TRACE_AFTER_WAIT:
1232 3 : test_event_threaded_lock();
1233 3 : break;
1234 0 : case TEVENT_TRACE_BEFORE_LOOP_ONCE:
1235 : case TEVENT_TRACE_AFTER_LOOP_ONCE:
1236 0 : break;
1237 : }
1238 12 : }
1239 :
1240 0 : static void test_event_threaded_timer(struct tevent_context *ev,
1241 : struct tevent_timer *te,
1242 : struct timeval current_time,
1243 : void *private_data)
1244 : {
1245 0 : return;
1246 : }
1247 :
1248 1 : static void *test_event_poll_thread(void *private_data)
1249 : {
1250 1 : struct tevent_context *ev = (struct tevent_context *)private_data;
1251 :
1252 1 : test_event_threaded_lock();
1253 :
1254 0 : while (true) {
1255 : int ret;
1256 3 : ret = tevent_loop_once(ev);
1257 3 : assert(ret == 0);
1258 3 : if (do_shutdown) {
1259 1 : test_event_threaded_unlock();
1260 1 : return NULL;
1261 : }
1262 : }
1263 :
1264 : }
1265 :
1266 2 : static void test_event_threaded_read_handler(struct tevent_context *ev,
1267 : struct tevent_fd *fde,
1268 : uint16_t flags,
1269 : void *private_data)
1270 : {
1271 2 : int *pfd = (int *)private_data;
1272 : char c;
1273 : ssize_t nread;
1274 :
1275 2 : if ((flags & TEVENT_FD_READ) == 0) {
1276 0 : return;
1277 : }
1278 :
1279 : do {
1280 4 : nread = read(*pfd, &c, 1);
1281 2 : } while ((nread == -1) && (errno == EINTR));
1282 :
1283 2 : assert(nread == 1);
1284 : }
1285 :
1286 1 : static bool test_event_context_threaded(struct torture_context *test,
1287 : const void *test_data)
1288 : {
1289 : struct tevent_context *ev;
1290 : struct tevent_timer *te;
1291 : struct tevent_fd *fde;
1292 : pthread_t poll_thread;
1293 : int fds[2];
1294 : int ret;
1295 1 : char c = 0;
1296 :
1297 1 : ev = tevent_context_init_byname(test, "poll_mt");
1298 1 : torture_assert(test, ev != NULL, "poll_mt not supported");
1299 :
1300 1 : tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
1301 :
1302 1 : te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
1303 : test_event_threaded_timer, NULL);
1304 1 : torture_assert(test, te != NULL, "Could not add timer");
1305 :
1306 1 : ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
1307 1 : torture_assert(test, ret == 0, "Could not create poll thread");
1308 :
1309 1 : ret = pipe(fds);
1310 1 : torture_assert(test, ret == 0, "Could not create pipe");
1311 :
1312 1 : poll(NULL, 0, 100);
1313 :
1314 1 : test_event_threaded_lock();
1315 :
1316 1 : fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
1317 : test_event_threaded_read_handler, &fds[0]);
1318 1 : torture_assert(test, fde != NULL, "Could not add fd event");
1319 :
1320 1 : test_event_threaded_unlock();
1321 :
1322 1 : poll(NULL, 0, 100);
1323 :
1324 1 : do_write(fds[1], &c, 1);
1325 :
1326 1 : poll(NULL, 0, 100);
1327 :
1328 1 : test_event_threaded_lock();
1329 1 : do_shutdown = true;
1330 1 : test_event_threaded_unlock();
1331 :
1332 1 : do_write(fds[1], &c, 1);
1333 :
1334 1 : ret = pthread_join(poll_thread, NULL);
1335 1 : torture_assert(test, ret == 0, "pthread_join failed");
1336 :
1337 0 : return true;
1338 : }
1339 :
1340 : #define NUM_TEVENT_THREADS 100
1341 :
1342 : /* Ugly, but needed for torture_comment... */
1343 : static struct torture_context *thread_test_ctx;
1344 : static pthread_t thread_map[NUM_TEVENT_THREADS];
1345 : static unsigned thread_counter;
1346 :
1347 : /* Called in master thread context */
1348 100 : static void callback_nowait(struct tevent_context *ev,
1349 : struct tevent_immediate *im,
1350 : void *private_ptr)
1351 : {
1352 100 : pthread_t *thread_id_ptr =
1353 0 : talloc_get_type_abort(private_ptr, pthread_t);
1354 : unsigned i;
1355 :
1356 5050 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1357 5050 : if (pthread_equal(*thread_id_ptr,
1358 : thread_map[i])) {
1359 0 : break;
1360 : }
1361 : }
1362 100 : torture_comment(thread_test_ctx,
1363 : "Callback %u from thread %u\n",
1364 : thread_counter,
1365 : i);
1366 100 : thread_counter++;
1367 100 : }
1368 :
1369 : /* Blast the master tevent_context with a callback, no waiting. */
1370 100 : static void *thread_fn_nowait(void *private_ptr)
1371 : {
1372 100 : struct tevent_thread_proxy *master_tp =
1373 0 : talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1374 : struct tevent_immediate *im;
1375 : pthread_t *thread_id_ptr;
1376 :
1377 100 : im = tevent_create_immediate(NULL);
1378 100 : if (im == NULL) {
1379 0 : return NULL;
1380 : }
1381 100 : thread_id_ptr = talloc(NULL, pthread_t);
1382 100 : if (thread_id_ptr == NULL) {
1383 0 : return NULL;
1384 : }
1385 100 : *thread_id_ptr = pthread_self();
1386 :
1387 100 : tevent_thread_proxy_schedule(master_tp,
1388 : &im,
1389 : callback_nowait,
1390 : &thread_id_ptr);
1391 100 : return NULL;
1392 : }
1393 :
1394 0 : static void timeout_fn(struct tevent_context *ev,
1395 : struct tevent_timer *te,
1396 : struct timeval tv, void *p)
1397 : {
1398 0 : thread_counter = NUM_TEVENT_THREADS * 10;
1399 0 : }
1400 :
1401 1 : static bool test_multi_tevent_threaded(struct torture_context *test,
1402 : const void *test_data)
1403 : {
1404 : unsigned i;
1405 : struct tevent_context *master_ev;
1406 : struct tevent_thread_proxy *tp;
1407 :
1408 1 : talloc_disable_null_tracking();
1409 :
1410 : /* Ugly global stuff. */
1411 1 : thread_test_ctx = test;
1412 1 : thread_counter = 0;
1413 :
1414 1 : master_ev = tevent_context_init(NULL);
1415 1 : if (master_ev == NULL) {
1416 0 : return false;
1417 : }
1418 1 : tevent_set_debug_stderr(master_ev);
1419 :
1420 1 : tp = tevent_thread_proxy_create(master_ev);
1421 1 : if (tp == NULL) {
1422 0 : torture_fail(test,
1423 : talloc_asprintf(test,
1424 : "tevent_thread_proxy_create failed\n"));
1425 : talloc_free(master_ev);
1426 : return false;
1427 : }
1428 :
1429 100 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1430 100 : int ret = pthread_create(&thread_map[i],
1431 : NULL,
1432 : thread_fn_nowait,
1433 : tp);
1434 100 : if (ret != 0) {
1435 0 : torture_fail(test,
1436 : talloc_asprintf(test,
1437 : "Failed to create thread %i, %d\n",
1438 : i, ret));
1439 : return false;
1440 : }
1441 : }
1442 :
1443 : /* Ensure we don't wait more than 10 seconds. */
1444 1 : tevent_add_timer(master_ev,
1445 : master_ev,
1446 : timeval_current_ofs(10,0),
1447 : timeout_fn,
1448 : NULL);
1449 :
1450 301 : while (thread_counter < NUM_TEVENT_THREADS) {
1451 299 : int ret = tevent_loop_once(master_ev);
1452 299 : torture_assert(test, ret == 0, "tevent_loop_once failed");
1453 : }
1454 :
1455 1 : torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1456 : "thread_counter fail\n");
1457 :
1458 1 : talloc_free(master_ev);
1459 1 : return true;
1460 : }
1461 :
1462 : struct reply_state {
1463 : struct tevent_thread_proxy *reply_tp;
1464 : pthread_t thread_id;
1465 : int *p_finished;
1466 : };
1467 :
1468 0 : static void thread_timeout_fn(struct tevent_context *ev,
1469 : struct tevent_timer *te,
1470 : struct timeval tv, void *p)
1471 : {
1472 0 : int *p_finished = (int *)p;
1473 :
1474 0 : *p_finished = 2;
1475 0 : }
1476 :
1477 : /* Called in child-thread context */
1478 100 : static void thread_callback(struct tevent_context *ev,
1479 : struct tevent_immediate *im,
1480 : void *private_ptr)
1481 : {
1482 100 : struct reply_state *rsp =
1483 0 : talloc_get_type_abort(private_ptr, struct reply_state);
1484 :
1485 100 : talloc_steal(ev, rsp);
1486 100 : *rsp->p_finished = 1;
1487 100 : }
1488 :
1489 : /* Called in master thread context */
1490 100 : static void master_callback(struct tevent_context *ev,
1491 : struct tevent_immediate *im,
1492 : void *private_ptr)
1493 : {
1494 100 : struct reply_state *rsp =
1495 100 : talloc_get_type_abort(private_ptr, struct reply_state);
1496 : unsigned i;
1497 :
1498 100 : talloc_steal(ev, rsp);
1499 :
1500 5050 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1501 5050 : if (pthread_equal(rsp->thread_id,
1502 : thread_map[i])) {
1503 0 : break;
1504 : }
1505 : }
1506 100 : torture_comment(thread_test_ctx,
1507 : "Callback %u from thread %u\n",
1508 : thread_counter,
1509 : i);
1510 : /* Now reply to the thread ! */
1511 100 : tevent_thread_proxy_schedule(rsp->reply_tp,
1512 : &im,
1513 : thread_callback,
1514 : &rsp);
1515 :
1516 100 : thread_counter++;
1517 100 : }
1518 :
1519 100 : static void *thread_fn_1(void *private_ptr)
1520 : {
1521 100 : struct tevent_thread_proxy *master_tp =
1522 0 : talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1523 : struct tevent_thread_proxy *tp;
1524 : struct tevent_immediate *im;
1525 : struct tevent_context *ev;
1526 : struct reply_state *rsp;
1527 100 : int finished = 0;
1528 : int ret;
1529 :
1530 100 : ev = tevent_context_init(NULL);
1531 100 : if (ev == NULL) {
1532 0 : return NULL;
1533 : }
1534 :
1535 100 : tp = tevent_thread_proxy_create(ev);
1536 100 : if (tp == NULL) {
1537 0 : talloc_free(ev);
1538 0 : return NULL;
1539 : }
1540 :
1541 100 : im = tevent_create_immediate(ev);
1542 100 : if (im == NULL) {
1543 0 : talloc_free(ev);
1544 0 : return NULL;
1545 : }
1546 :
1547 100 : rsp = talloc(ev, struct reply_state);
1548 100 : if (rsp == NULL) {
1549 0 : talloc_free(ev);
1550 0 : return NULL;
1551 : }
1552 :
1553 100 : rsp->thread_id = pthread_self();
1554 100 : rsp->reply_tp = tp;
1555 100 : rsp->p_finished = &finished;
1556 :
1557 : /* Introduce a little randomness into the mix.. */
1558 100 : usleep(random() % 7000);
1559 :
1560 100 : tevent_thread_proxy_schedule(master_tp,
1561 : &im,
1562 : master_callback,
1563 : &rsp);
1564 :
1565 : /* Ensure we don't wait more than 10 seconds. */
1566 100 : tevent_add_timer(ev,
1567 : ev,
1568 : timeval_current_ofs(10,0),
1569 : thread_timeout_fn,
1570 : &finished);
1571 :
1572 100 : while (finished == 0) {
1573 200 : ret = tevent_loop_once(ev);
1574 200 : assert(ret == 0);
1575 : }
1576 :
1577 100 : if (finished > 1) {
1578 : /* Timeout ! */
1579 0 : abort();
1580 : }
1581 :
1582 : /*
1583 : * NB. We should talloc_free(ev) here, but if we do
1584 : * we currently get hit by helgrind Fix #323432
1585 : * "When calling pthread_cond_destroy or pthread_mutex_destroy
1586 : * with initializers as argument Helgrind (incorrectly) reports errors."
1587 : *
1588 : * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1589 : * with-pthread-mutex-destroy-td47757.html
1590 : *
1591 : * Helgrind doesn't understand that the request/reply
1592 : * messages provide synchronization between the lock/unlock
1593 : * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1594 : * when the struct tevent_thread_proxy object is talloc_free'd.
1595 : *
1596 : * As a work-around for now return ev for the parent thread to free.
1597 : */
1598 0 : return ev;
1599 : }
1600 :
1601 1 : static bool test_multi_tevent_threaded_1(struct torture_context *test,
1602 : const void *test_data)
1603 : {
1604 : unsigned i;
1605 : struct tevent_context *master_ev;
1606 : struct tevent_thread_proxy *master_tp;
1607 : int ret;
1608 :
1609 1 : talloc_disable_null_tracking();
1610 :
1611 : /* Ugly global stuff. */
1612 1 : thread_test_ctx = test;
1613 1 : thread_counter = 0;
1614 :
1615 1 : master_ev = tevent_context_init(NULL);
1616 1 : if (master_ev == NULL) {
1617 0 : return false;
1618 : }
1619 1 : tevent_set_debug_stderr(master_ev);
1620 :
1621 1 : master_tp = tevent_thread_proxy_create(master_ev);
1622 1 : if (master_tp == NULL) {
1623 0 : torture_fail(test,
1624 : talloc_asprintf(test,
1625 : "tevent_thread_proxy_create failed\n"));
1626 : talloc_free(master_ev);
1627 : return false;
1628 : }
1629 :
1630 100 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1631 100 : ret = pthread_create(&thread_map[i],
1632 : NULL,
1633 : thread_fn_1,
1634 : master_tp);
1635 100 : if (ret != 0) {
1636 0 : torture_fail(test,
1637 : talloc_asprintf(test,
1638 : "Failed to create thread %i, %d\n",
1639 : i, ret));
1640 : return false;
1641 : }
1642 : }
1643 :
1644 186 : while (thread_counter < NUM_TEVENT_THREADS) {
1645 185 : ret = tevent_loop_once(master_ev);
1646 185 : torture_assert(test, ret == 0, "tevent_loop_once failed");
1647 : }
1648 :
1649 : /* Wait for all the threads to finish - join 'em. */
1650 100 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1651 : void *retval;
1652 100 : ret = pthread_join(thread_map[i], &retval);
1653 100 : torture_assert(test, ret == 0, "pthread_join failed");
1654 : /* Free the child thread event context. */
1655 100 : talloc_free(retval);
1656 : }
1657 :
1658 1 : talloc_free(master_ev);
1659 1 : return true;
1660 : }
1661 :
1662 : struct threaded_test_2 {
1663 : struct tevent_threaded_context *tctx;
1664 : struct tevent_immediate *im;
1665 : pthread_t thread_id;
1666 : };
1667 :
1668 : static void master_callback_2(struct tevent_context *ev,
1669 : struct tevent_immediate *im,
1670 : void *private_data);
1671 :
1672 100 : static void *thread_fn_2(void *private_data)
1673 : {
1674 100 : struct threaded_test_2 *state = private_data;
1675 :
1676 100 : state->thread_id = pthread_self();
1677 :
1678 100 : usleep(random() % 7000);
1679 :
1680 100 : tevent_threaded_schedule_immediate(
1681 : state->tctx, state->im, master_callback_2, state);
1682 :
1683 100 : return NULL;
1684 : }
1685 :
1686 100 : static void master_callback_2(struct tevent_context *ev,
1687 : struct tevent_immediate *im,
1688 : void *private_data)
1689 : {
1690 100 : struct threaded_test_2 *state = private_data;
1691 : int i;
1692 :
1693 5050 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1694 5050 : if (pthread_equal(state->thread_id, thread_map[i])) {
1695 0 : break;
1696 : }
1697 : }
1698 100 : torture_comment(thread_test_ctx,
1699 : "Callback_2 %u from thread %u\n",
1700 : thread_counter,
1701 : i);
1702 100 : thread_counter++;
1703 100 : }
1704 :
1705 1 : static bool test_multi_tevent_threaded_2(struct torture_context *test,
1706 : const void *test_data)
1707 : {
1708 : unsigned i;
1709 :
1710 : struct tevent_context *ev;
1711 : struct tevent_threaded_context *tctx;
1712 : int ret;
1713 :
1714 1 : thread_test_ctx = test;
1715 1 : thread_counter = 0;
1716 :
1717 1 : ev = tevent_context_init(test);
1718 1 : torture_assert(test, ev != NULL, "tevent_context_init failed");
1719 :
1720 : /*
1721 : * tevent_re_initialise used to have a bug where it did not
1722 : * re-initialise the thread support after taking it
1723 : * down. Exercise that code path.
1724 : */
1725 1 : ret = tevent_re_initialise(ev);
1726 1 : torture_assert(test, ret == 0, "tevent_re_initialise failed");
1727 :
1728 1 : tctx = tevent_threaded_context_create(ev, ev);
1729 1 : torture_assert(test, tctx != NULL,
1730 : "tevent_threaded_context_create failed");
1731 :
1732 100 : for (i=0; i<NUM_TEVENT_THREADS; i++) {
1733 : struct threaded_test_2 *state;
1734 :
1735 100 : state = talloc(ev, struct threaded_test_2);
1736 100 : torture_assert(test, state != NULL, "talloc failed");
1737 :
1738 100 : state->tctx = tctx;
1739 100 : state->im = tevent_create_immediate(state);
1740 100 : torture_assert(test, state->im != NULL,
1741 : "tevent_create_immediate failed");
1742 :
1743 100 : ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1744 100 : torture_assert(test, ret == 0, "pthread_create failed");
1745 : }
1746 :
1747 163 : while (thread_counter < NUM_TEVENT_THREADS) {
1748 162 : ret = tevent_loop_once(ev);
1749 162 : torture_assert(test, ret == 0, "tevent_loop_once failed");
1750 : }
1751 :
1752 : /* Wait for all the threads to finish - join 'em. */
1753 100 : for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1754 : void *retval;
1755 100 : ret = pthread_join(thread_map[i], &retval);
1756 100 : torture_assert(test, ret == 0, "pthread_join failed");
1757 : /* Free the child thread event context. */
1758 : }
1759 :
1760 1 : talloc_free(tctx);
1761 1 : talloc_free(ev);
1762 1 : return true;
1763 : }
1764 : #endif
1765 :
1766 2355 : struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1767 : {
1768 2355 : struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1769 2355 : const char **list = tevent_backend_list(suite);
1770 : int i;
1771 :
1772 11775 : for (i=0;list && list[i];i++) {
1773 : struct torture_suite *backend_suite;
1774 :
1775 9420 : backend_suite = torture_suite_create(mem_ctx, list[i]);
1776 :
1777 9420 : torture_suite_add_simple_tcase_const(backend_suite,
1778 : "context",
1779 : test_event_context,
1780 8928 : (const void *)list[i]);
1781 9420 : torture_suite_add_simple_tcase_const(backend_suite,
1782 : "fd1",
1783 : test_event_fd1,
1784 8928 : (const void *)list[i]);
1785 9420 : torture_suite_add_simple_tcase_const(backend_suite,
1786 : "fd2",
1787 : test_event_fd2,
1788 8928 : (const void *)list[i]);
1789 9420 : torture_suite_add_simple_tcase_const(backend_suite,
1790 : "wrapper",
1791 : test_wrapper,
1792 8928 : (const void *)list[i]);
1793 9420 : torture_suite_add_simple_tcase_const(backend_suite,
1794 : "free_wrapper",
1795 : test_free_wrapper,
1796 8928 : (const void *)list[i]);
1797 :
1798 9420 : torture_suite_add_suite(suite, backend_suite);
1799 : }
1800 :
1801 : #ifdef HAVE_PTHREAD
1802 2355 : torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1803 : test_event_context_threaded,
1804 : NULL);
1805 :
1806 2355 : torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1807 : test_multi_tevent_threaded,
1808 : NULL);
1809 :
1810 2355 : torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1811 : test_multi_tevent_threaded_1,
1812 : NULL);
1813 :
1814 2355 : torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1815 : test_multi_tevent_threaded_2,
1816 : NULL);
1817 :
1818 : #endif
1819 :
1820 2355 : return suite;
1821 : }
|