LCOV - code coverage report
Current view: top level - source3/lib - messages.c (source / functions) Hit Total Coverage
Test: coverage report for master 2b515b7d Lines: 468 586 79.9 %
Date: 2024-02-28 12:06:22 Functions: 38 42 90.5 %

          Line data    Source code
       1             : /* 
       2             :    Unix SMB/CIFS implementation.
       3             :    Samba internal messaging functions
       4             :    Copyright (C) Andrew Tridgell 2000
       5             :    Copyright (C) 2001 by Martin Pool
       6             :    Copyright (C) 2002 by Jeremy Allison
       7             :    Copyright (C) 2007 by Volker Lendecke
       8             : 
       9             :    This program is free software; you can redistribute it and/or modify
      10             :    it under the terms of the GNU General Public License as published by
      11             :    the Free Software Foundation; either version 3 of the License, or
      12             :    (at your option) any later version.
      13             : 
      14             :    This program is distributed in the hope that it will be useful,
      15             :    but WITHOUT ANY WARRANTY; without even the implied warranty of
      16             :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      17             :    GNU General Public License for more details.
      18             : 
      19             :    You should have received a copy of the GNU General Public License
      20             :    along with this program.  If not, see <http://www.gnu.org/licenses/>.
      21             : */
      22             : 
      23             : /**
      24             :   @defgroup messages Internal messaging framework
      25             :   @{
      26             :   @file messages.c
      27             : 
      28             :   @brief  Module for internal messaging between Samba daemons. 
      29             : 
      30             :    The idea is that if a part of Samba wants to do communication with
      31             :    another Samba process then it will do a message_register() of a
      32             :    dispatch function, and use message_send_pid() to send messages to
      33             :    that process.
      34             : 
      35             :    The dispatch function is given the pid of the sender, and it can
      36             :    use that to reply by message_send_pid().  See ping_message() for a
      37             :    simple example.
      38             : 
      39             :    @caution Dispatch functions must be able to cope with incoming
      40             :    messages on an *odd* byte boundary.
      41             : 
      42             :    This system doesn't have any inherent size limitations but is not
      43             :    very efficient for large messages or when messages are sent in very
      44             :    quick succession.
      45             : 
      46             : */
      47             : 
      48             : #include "includes.h"
      49             : #include "lib/util/server_id.h"
      50             : #include "dbwrap/dbwrap.h"
      51             : #include "serverid.h"
      52             : #include "messages.h"
      53             : #include "lib/util/tevent_unix.h"
      54             : #include "lib/background.h"
      55             : #include "lib/messaging/messages_dgm.h"
      56             : #include "lib/util/iov_buf.h"
      57             : #include "lib/util/server_id_db.h"
      58             : #include "lib/messaging/messages_dgm_ref.h"
      59             : #include "lib/messages_ctdb.h"
      60             : #include "lib/messages_ctdb_ref.h"
      61             : #include "lib/messages_util.h"
      62             : #include "cluster_support.h"
      63             : #include "ctdbd_conn.h"
      64             : #include "ctdb_srvids.h"
      65             : 
      66             : #ifdef CLUSTER_SUPPORT
      67             : #include "ctdb_protocol.h"
      68             : #endif
      69             : 
      70             : struct messaging_callback {
      71             :         struct messaging_callback *prev, *next;
      72             :         uint32_t msg_type;
      73             :         void (*fn)(struct messaging_context *msg, void *private_data, 
      74             :                    uint32_t msg_type, 
      75             :                    struct server_id server_id, DATA_BLOB *data);
      76             :         void *private_data;
      77             : };
      78             : 
      79             : struct messaging_registered_ev {
      80             :         struct tevent_context *ev;
      81             :         struct tevent_immediate *im;
      82             :         size_t refcount;
      83             : };
      84             : 
      85             : struct messaging_context {
      86             :         struct server_id id;
      87             :         struct tevent_context *event_ctx;
      88             :         struct messaging_callback *callbacks;
      89             : 
      90             :         struct messaging_rec *posted_msgs;
      91             : 
      92             :         struct messaging_registered_ev *event_contexts;
      93             : 
      94             :         struct tevent_req **new_waiters;
      95             :         size_t num_new_waiters;
      96             : 
      97             :         struct tevent_req **waiters;
      98             :         size_t num_waiters;
      99             : 
     100             :         struct server_id_db *names_db;
     101             : 
     102             :         TALLOC_CTX *per_process_talloc_ctx;
     103             : };
     104             : 
     105             : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
     106             :                                                struct messaging_rec *rec);
     107             : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
     108             :                                        struct messaging_rec *rec);
     109             : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
     110             :                                        struct tevent_context *ev,
     111             :                                        struct messaging_rec *rec);
     112             : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
     113             :                                    struct tevent_context *ev,
     114             :                                    struct messaging_rec *rec);
     115             : 
     116             : /****************************************************************************
     117             :  A useful function for testing the message system.
     118             : ****************************************************************************/
     119             : 
     120         105 : static void ping_message(struct messaging_context *msg_ctx,
     121             :                          void *private_data,
     122             :                          uint32_t msg_type,
     123             :                          struct server_id src,
     124             :                          DATA_BLOB *data)
     125             : {
     126         105 :         struct server_id_buf idbuf;
     127             : 
     128         210 :         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
     129             :                   server_id_str_buf(src, &idbuf), (int)data->length,
     130             :                   data->data ? (char *)data->data : ""));
     131             : 
     132         105 :         messaging_send(msg_ctx, src, MSG_PONG, data);
     133         105 : }
     134             : 
     135      183488 : struct messaging_rec *messaging_rec_create(
     136             :         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
     137             :         uint32_t msg_type, const struct iovec *iov, int iovlen,
     138             :         const int *fds, size_t num_fds)
     139             : {
     140         369 :         ssize_t buflen;
     141         369 :         uint8_t *buf;
     142         369 :         struct messaging_rec *result;
     143             : 
     144      183488 :         if (num_fds > INT8_MAX) {
     145           0 :                 return NULL;
     146             :         }
     147             : 
     148      183488 :         buflen = iov_buflen(iov, iovlen);
     149      183488 :         if (buflen == -1) {
     150           0 :                 return NULL;
     151             :         }
     152      183488 :         buf = talloc_array(mem_ctx, uint8_t, buflen);
     153      183488 :         if (buf == NULL) {
     154           0 :                 return NULL;
     155             :         }
     156      183488 :         iov_buf(iov, iovlen, buf, buflen);
     157             : 
     158      183488 :         {
     159         369 :                 struct messaging_rec rec;
     160      183488 :                 int64_t fds64[MAX(1, num_fds)];
     161         369 :                 size_t i;
     162             : 
     163      183489 :                 for (i=0; i<num_fds; i++) {
     164           1 :                         fds64[i] = fds[i];
     165             :                 }
     166             : 
     167      183488 :                 rec = (struct messaging_rec) {
     168             :                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
     169             :                         .src = src, .dest = dst,
     170             :                         .buf.data = buf, .buf.length = buflen,
     171             :                         .num_fds = num_fds, .fds = fds64,
     172             :                 };
     173             : 
     174      183488 :                 result = messaging_rec_dup(mem_ctx, &rec);
     175             :         }
     176             : 
     177      183488 :         TALLOC_FREE(buf);
     178             : 
     179      183488 :         return result;
     180             : }
     181             : 
     182      159078 : static bool messaging_register_event_context(struct messaging_context *ctx,
     183             :                                              struct tevent_context *ev)
     184             : {
     185        3937 :         size_t i, num_event_contexts;
     186      159078 :         struct messaging_registered_ev *free_reg = NULL;
     187        3937 :         struct messaging_registered_ev *tmp;
     188             : 
     189      159078 :         num_event_contexts = talloc_array_length(ctx->event_contexts);
     190             : 
     191      159114 :         for (i=0; i<num_event_contexts; i++) {
     192      153805 :                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
     193             : 
     194      153805 :                 if (reg->refcount == 0) {
     195           5 :                         if (reg->ev != NULL) {
     196           0 :                                 abort();
     197             :                         }
     198           5 :                         free_reg = reg;
     199             :                         /*
     200             :                          * We continue here and may find another
     201             :                          * free_req, but the important thing is
     202             :                          * that we continue to search for an
     203             :                          * existing registration in the loop.
     204             :                          */
     205           5 :                         continue;
     206             :                 }
     207             : 
     208      153800 :                 if (reg->ev == ev) {
     209      153769 :                         reg->refcount += 1;
     210      153769 :                         return true;
     211             :                 }
     212             :         }
     213             : 
     214        5309 :         if (free_reg == NULL) {
     215        5304 :                 struct tevent_immediate *im = NULL;
     216             : 
     217        5304 :                 im = tevent_create_immediate(ctx);
     218        5304 :                 if (im == NULL) {
     219           0 :                         return false;
     220             :                 }
     221             : 
     222        5304 :                 tmp = talloc_realloc(ctx, ctx->event_contexts,
     223             :                                      struct messaging_registered_ev,
     224             :                                      num_event_contexts+1);
     225        5304 :                 if (tmp == NULL) {
     226           0 :                         return false;
     227             :                 }
     228        5304 :                 ctx->event_contexts = tmp;
     229             : 
     230        5304 :                 free_reg = &ctx->event_contexts[num_event_contexts];
     231        5304 :                 free_reg->im = talloc_move(ctx->event_contexts, &im);
     232             :         }
     233             : 
     234             :         /*
     235             :          * free_reg->im might be cached
     236             :          */
     237        5309 :         free_reg->ev = ev;
     238        5309 :         free_reg->refcount = 1;
     239             : 
     240        5309 :         return true;
     241             : }
     242             : 
     243      147204 : static bool messaging_deregister_event_context(struct messaging_context *ctx,
     244             :                                                struct tevent_context *ev)
     245             : {
     246        3665 :         size_t i, num_event_contexts;
     247             : 
     248      147204 :         num_event_contexts = talloc_array_length(ctx->event_contexts);
     249             : 
     250      147235 :         for (i=0; i<num_event_contexts; i++) {
     251      147235 :                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
     252             : 
     253      147235 :                 if (reg->refcount == 0) {
     254           0 :                         continue;
     255             :                 }
     256             : 
     257      147235 :                 if (reg->ev == ev) {
     258      147204 :                         reg->refcount -= 1;
     259             : 
     260      147204 :                         if (reg->refcount == 0) {
     261             :                                 /*
     262             :                                  * The primary event context
     263             :                                  * is never unregistered using
     264             :                                  * messaging_deregister_event_context()
     265             :                                  * it's only registered using
     266             :                                  * messaging_register_event_context().
     267             :                                  */
     268          31 :                                 SMB_ASSERT(ev != ctx->event_ctx);
     269          31 :                                 SMB_ASSERT(reg->ev != ctx->event_ctx);
     270             : 
     271             :                                 /*
     272             :                                  * Not strictly necessary, just
     273             :                                  * paranoia
     274             :                                  */
     275          31 :                                 reg->ev = NULL;
     276             : 
     277             :                                 /*
     278             :                                  * Do not talloc_free(reg->im),
     279             :                                  * recycle immediates events.
     280             :                                  *
     281             :                                  * We just invalidate it using
     282             :                                  * the primary event context,
     283             :                                  * which is never unregistered.
     284             :                                  */
     285          31 :                                 tevent_schedule_immediate(reg->im,
     286             :                                                           ctx->event_ctx,
     287        3665 :                                                           NULL, NULL);
     288             :                         }
     289      147204 :                         return true;
     290             :                 }
     291             :         }
     292           0 :         return false;
     293             : }
     294             : 
     295      182995 : static void messaging_post_main_event_context(struct tevent_context *ev,
     296             :                                               struct tevent_immediate *im,
     297             :                                               void *private_data)
     298             : {
     299      182995 :         struct messaging_context *ctx = talloc_get_type_abort(
     300             :                 private_data, struct messaging_context);
     301             : 
     302      394756 :         while (ctx->posted_msgs != NULL) {
     303      183429 :                 struct messaging_rec *rec = ctx->posted_msgs;
     304         366 :                 bool consumed;
     305             : 
     306      183429 :                 DLIST_REMOVE(ctx->posted_msgs, rec);
     307             : 
     308      183429 :                 consumed = messaging_dispatch_classic(ctx, rec);
     309      183425 :                 if (!consumed) {
     310       21929 :                         consumed = messaging_dispatch_waiters(
     311             :                                 ctx, ctx->event_ctx, rec);
     312             :                 }
     313             : 
     314      183425 :                 if (!consumed) {
     315             :                         uint8_t i;
     316             : 
     317       20484 :                         for (i=0; i<rec->num_fds; i++) {
     318           0 :                                 close(rec->fds[i]);
     319             :                         }
     320             :                 }
     321             : 
     322      183778 :                 TALLOC_FREE(rec);
     323             :         }
     324      182991 : }
     325             : 
     326           0 : static void messaging_post_sub_event_context(struct tevent_context *ev,
     327             :                                              struct tevent_immediate *im,
     328             :                                              void *private_data)
     329             : {
     330           0 :         struct messaging_context *ctx = talloc_get_type_abort(
     331             :                 private_data, struct messaging_context);
     332           0 :         struct messaging_rec *rec, *next;
     333             : 
     334           0 :         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
     335           0 :                 bool consumed;
     336             : 
     337           0 :                 next = rec->next;
     338             : 
     339           0 :                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
     340           0 :                 if (consumed) {
     341           0 :                         DLIST_REMOVE(ctx->posted_msgs, rec);
     342           0 :                         TALLOC_FREE(rec);
     343             :                 }
     344             :         }
     345           0 : }
     346             : 
     347      183488 : static bool messaging_alert_event_contexts(struct messaging_context *ctx)
     348             : {
     349         369 :         size_t i, num_event_contexts;
     350             : 
     351      183488 :         num_event_contexts = talloc_array_length(ctx->event_contexts);
     352             : 
     353      367004 :         for (i=0; i<num_event_contexts; i++) {
     354      183516 :                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
     355             : 
     356      183516 :                 if (reg->refcount == 0) {
     357          23 :                         continue;
     358             :                 }
     359             : 
     360             :                 /*
     361             :                  * We depend on schedule_immediate to work
     362             :                  * multiple times. Might be a bit inefficient,
     363             :                  * but this needs to be proven in tests. The
     364             :                  * alternatively would be to track whether the
     365             :                  * immediate has already been scheduled. For
     366             :                  * now, avoid that complexity here.
     367             :                  */
     368             : 
     369      183493 :                 if (reg->ev == ctx->event_ctx) {
     370      183488 :                         tevent_schedule_immediate(
     371             :                                 reg->im, reg->ev,
     372             :                                 messaging_post_main_event_context,
     373         369 :                                 ctx);
     374             :                 } else {
     375           5 :                         tevent_schedule_immediate(
     376             :                                 reg->im, reg->ev,
     377             :                                 messaging_post_sub_event_context,
     378         371 :                                 ctx);
     379             :                 }
     380             : 
     381             :         }
     382      183488 :         return true;
     383             : }
     384             : 
     385       85770 : static void messaging_recv_cb(struct tevent_context *ev,
     386             :                               const uint8_t *msg, size_t msg_len,
     387             :                               int *fds, size_t num_fds,
     388             :                               void *private_data)
     389       85770 : {
     390       85770 :         struct messaging_context *msg_ctx = talloc_get_type_abort(
     391             :                 private_data, struct messaging_context);
     392         345 :         struct server_id_buf idbuf;
     393         345 :         struct messaging_rec rec;
     394       85770 :         int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
     395         345 :         size_t i;
     396             : 
     397       85770 :         if (msg_len < MESSAGE_HDR_LENGTH) {
     398           0 :                 DBG_WARNING("message too short: %zu\n", msg_len);
     399           0 :                 return;
     400             :         }
     401             : 
     402       85770 :         if (num_fds > INT8_MAX) {
     403           0 :                 DBG_WARNING("too many fds: %zu\n", num_fds);
     404           0 :                 return;
     405             :         }
     406             : 
     407      121805 :         for (i=0; i < num_fds; i++) {
     408       36035 :                 fds64[i] = fds[i];
     409             :         }
     410             : 
     411       85770 :         rec = (struct messaging_rec) {
     412             :                 .msg_version = MESSAGE_VERSION,
     413       85770 :                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
     414       85770 :                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
     415             :                 .num_fds = num_fds,
     416             :                 .fds = fds64,
     417             :         };
     418             : 
     419       85770 :         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
     420             : 
     421       85770 :         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
     422             :                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
     423             :                   server_id_str_buf(rec.src, &idbuf));
     424             : 
     425       85770 :         if (server_id_same_process(&rec.src, &msg_ctx->id)) {
     426           0 :                 DBG_DEBUG("Ignoring self-send\n");
     427           0 :                 return;
     428             :         }
     429             : 
     430       85770 :         messaging_dispatch_rec(msg_ctx, ev, &rec);
     431             : 
     432      122150 :         for (i=0; i<num_fds; i++) {
     433       36035 :                 fds[i] = fds64[i];
     434             :         }
     435             : }
     436             : 
     437       40031 : static int messaging_context_destructor(struct messaging_context *ctx)
     438             : {
     439         966 :         size_t i;
     440             : 
     441      227456 :         for (i=0; i<ctx->num_new_waiters; i++) {
     442      187425 :                 if (ctx->new_waiters[i] != NULL) {
     443       37580 :                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
     444       37580 :                         ctx->new_waiters[i] = NULL;
     445             :                 }
     446             :         }
     447       60040 :         for (i=0; i<ctx->num_waiters; i++) {
     448       20009 :                 if (ctx->waiters[i] != NULL) {
     449        2458 :                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
     450        2458 :                         ctx->waiters[i] = NULL;
     451             :                 }
     452             :         }
     453             : 
     454             :         /*
     455             :          * The immediates from messaging_alert_event_contexts
     456             :          * reference "ctx". Don't let them outlive the
     457             :          * messaging_context we're destroying here.
     458             :          */
     459       40031 :         TALLOC_FREE(ctx->event_contexts);
     460             : 
     461       40031 :         return 0;
     462             : }
     463             : 
     464       40930 : static const char *private_path(const char *name)
     465             : {
     466       40930 :         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
     467             : }
     468             : 
     469        5278 : static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
     470             :                                         struct tevent_context *ev,
     471             :                                         struct messaging_context **pmsg_ctx)
     472             : {
     473         137 :         TALLOC_CTX *frame;
     474         137 :         struct messaging_context *ctx;
     475         137 :         NTSTATUS status;
     476         137 :         int ret;
     477         137 :         const char *lck_path;
     478         137 :         const char *priv_path;
     479         137 :         void *ref;
     480         137 :         bool ok;
     481             : 
     482             :         /*
     483             :          * sec_init() *must* be called before any other
     484             :          * functions that use sec_XXX(). e.g. sec_initial_uid().
     485             :          */
     486             : 
     487        5278 :         sec_init();
     488             : 
     489        5278 :         lck_path = lock_path(talloc_tos(), "msg.lock");
     490        5278 :         if (lck_path == NULL) {
     491           0 :                 return NT_STATUS_NO_MEMORY;
     492             :         }
     493             : 
     494        5278 :         ok = directory_create_or_exist_strict(lck_path,
     495             :                                               sec_initial_uid(),
     496             :                                               0755);
     497        5278 :         if (!ok) {
     498           0 :                 DBG_DEBUG("Could not create lock directory: %s\n",
     499             :                           strerror(errno));
     500           0 :                 return NT_STATUS_ACCESS_DENIED;
     501             :         }
     502             : 
     503        5278 :         priv_path = private_path("msg.sock");
     504        5278 :         if (priv_path == NULL) {
     505           0 :                 return NT_STATUS_NO_MEMORY;
     506             :         }
     507             : 
     508        5278 :         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
     509             :                                               0700);
     510        5278 :         if (!ok) {
     511           0 :                 DBG_DEBUG("Could not create msg directory: %s\n",
     512             :                           strerror(errno));
     513           0 :                 return NT_STATUS_ACCESS_DENIED;
     514             :         }
     515             : 
     516        5278 :         frame = talloc_stackframe();
     517        5278 :         if (frame == NULL) {
     518           0 :                 return NT_STATUS_NO_MEMORY;
     519             :         }
     520             : 
     521        5278 :         ctx = talloc_zero(frame, struct messaging_context);
     522        5278 :         if (ctx == NULL) {
     523           0 :                 status = NT_STATUS_NO_MEMORY;
     524           0 :                 goto done;
     525             :         }
     526             : 
     527        5415 :         ctx->id = (struct server_id) {
     528        5278 :                 .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
     529             :         };
     530             : 
     531        5278 :         ctx->event_ctx = ev;
     532             : 
     533        5278 :         ctx->per_process_talloc_ctx = talloc_new(ctx);
     534        5278 :         if (ctx->per_process_talloc_ctx == NULL) {
     535           0 :                 status = NT_STATUS_NO_MEMORY;
     536           0 :                 goto done;
     537             :         }
     538             : 
     539        5278 :         ok = messaging_register_event_context(ctx, ev);
     540        5278 :         if (!ok) {
     541           0 :                 status = NT_STATUS_NO_MEMORY;
     542           0 :                 goto done;
     543             :         }
     544             : 
     545        5278 :         ref = messaging_dgm_ref(
     546             :                 ctx->per_process_talloc_ctx,
     547             :                 ctx->event_ctx,
     548             :                 &ctx->id.unique_id,
     549             :                 priv_path,
     550             :                 lck_path,
     551             :                 messaging_recv_cb,
     552             :                 ctx,
     553             :                 &ret);
     554        5278 :         if (ref == NULL) {
     555           5 :                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
     556           5 :                 status = map_nt_error_from_unix(ret);
     557           5 :                 goto done;
     558             :         }
     559        5273 :         talloc_set_destructor(ctx, messaging_context_destructor);
     560             : 
     561             : #ifdef CLUSTER_SUPPORT
     562             :         if (lp_clustering()) {
     563             :                 ref = messaging_ctdb_ref(
     564             :                         ctx->per_process_talloc_ctx,
     565             :                         ctx->event_ctx,
     566             :                         lp_ctdbd_socket(),
     567             :                         lp_ctdb_timeout(),
     568             :                         ctx->id.unique_id,
     569             :                         messaging_recv_cb,
     570             :                         ctx,
     571             :                         &ret);
     572             :                 if (ref == NULL) {
     573             :                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
     574             :                                    strerror(ret));
     575             :                         status = map_nt_error_from_unix(ret);
     576             :                         goto done;
     577             :                 }
     578             :         }
     579             : #endif
     580             : 
     581        5273 :         ctx->id.vnn = get_my_vnn();
     582             : 
     583        5273 :         ctx->names_db = server_id_db_init(ctx,
     584             :                                           ctx->id,
     585             :                                           lp_lock_directory(),
     586             :                                           0,
     587             :                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
     588        5273 :         if (ctx->names_db == NULL) {
     589           0 :                 DBG_DEBUG("server_id_db_init failed\n");
     590           0 :                 status = NT_STATUS_NO_MEMORY;
     591           0 :                 goto done;
     592             :         }
     593             : 
     594        5273 :         messaging_register(ctx, NULL, MSG_PING, ping_message);
     595             : 
     596             :         /* Register some debugging related messages */
     597             : 
     598        5273 :         register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
     599        5273 :         register_dmalloc_msgs(ctx);
     600        5273 :         debug_register_msgs(ctx);
     601             : 
     602             :         {
     603         132 :                 struct server_id_buf tmp;
     604        5273 :                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
     605             :         }
     606             : 
     607        5273 :         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
     608             : 
     609        5273 :         status = NT_STATUS_OK;
     610        5278 : done:
     611        5278 :         TALLOC_FREE(frame);
     612             : 
     613        5278 :         return status;
     614             : }
     615             : 
     616        5278 : struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
     617             :                                          struct tevent_context *ev)
     618             : {
     619        5278 :         struct messaging_context *ctx = NULL;
     620         137 :         NTSTATUS status;
     621             : 
     622        5278 :         status = messaging_init_internal(mem_ctx,
     623             :                                          ev,
     624             :                                          &ctx);
     625        5278 :         if (!NT_STATUS_IS_OK(status)) {
     626           0 :                 return NULL;
     627             :         }
     628             : 
     629        5273 :         return ctx;
     630             : }
     631             : 
     632     6295345 : struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
     633             : {
     634     6295345 :         return msg_ctx->id;
     635             : }
     636             : 
     637             : /*
     638             :  * re-init after a fork
     639             :  */
     640       35652 : NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
     641             : {
     642         855 :         int ret;
     643         855 :         char *lck_path;
     644         855 :         void *ref;
     645             : 
     646       35652 :         TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
     647             : 
     648       35652 :         msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
     649       35652 :         if (msg_ctx->per_process_talloc_ctx == NULL) {
     650           0 :                 return NT_STATUS_NO_MEMORY;
     651             :         }
     652             : 
     653       36507 :         msg_ctx->id = (struct server_id) {
     654       35652 :                 .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
     655             :         };
     656             : 
     657       35652 :         lck_path = lock_path(talloc_tos(), "msg.lock");
     658       35652 :         if (lck_path == NULL) {
     659           0 :                 return NT_STATUS_NO_MEMORY;
     660             :         }
     661             : 
     662       35652 :         ref = messaging_dgm_ref(
     663             :                 msg_ctx->per_process_talloc_ctx,
     664             :                 msg_ctx->event_ctx,
     665             :                 &msg_ctx->id.unique_id,
     666             :                 private_path("msg.sock"),
     667             :                 lck_path,
     668             :                 messaging_recv_cb,
     669             :                 msg_ctx,
     670             :                 &ret);
     671             : 
     672       35652 :         if (ref == NULL) {
     673           0 :                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
     674           0 :                 return map_nt_error_from_unix(ret);
     675             :         }
     676             : 
     677       35652 :         if (lp_clustering()) {
     678           0 :                 ref = messaging_ctdb_ref(
     679             :                         msg_ctx->per_process_talloc_ctx,
     680             :                         msg_ctx->event_ctx,
     681             :                         lp_ctdbd_socket(),
     682             :                         lp_ctdb_timeout(),
     683             :                         msg_ctx->id.unique_id,
     684             :                         messaging_recv_cb,
     685             :                         msg_ctx,
     686             :                         &ret);
     687           0 :                 if (ref == NULL) {
     688           0 :                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
     689             :                                    strerror(ret));
     690           0 :                         return map_nt_error_from_unix(ret);
     691             :                 }
     692             :         }
     693             : 
     694       35652 :         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
     695       35652 :         register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
     696             : 
     697       35652 :         return NT_STATUS_OK;
     698             : }
     699             : 
     700             : 
     701             : /*
     702             :  * Register a dispatch function for a particular message type. Allow multiple
     703             :  * registrants
     704             : */
     705      495932 : NTSTATUS messaging_register(struct messaging_context *msg_ctx,
     706             :                             void *private_data,
     707             :                             uint32_t msg_type,
     708             :                             void (*fn)(struct messaging_context *msg,
     709             :                                        void *private_data, 
     710             :                                        uint32_t msg_type, 
     711             :                                        struct server_id server_id,
     712             :                                        DATA_BLOB *data))
     713             : {
     714       11086 :         struct messaging_callback *cb;
     715             : 
     716      495932 :         DEBUG(5, ("Registering messaging pointer for type %u - "
     717             :                   "private_data=%p\n",
     718             :                   (unsigned)msg_type, private_data));
     719             : 
     720             :         /*
     721             :          * Only one callback per type
     722             :          */
     723             : 
     724    11129758 :         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
     725             :                 /* we allow a second registration of the same message
     726             :                    type if it has a different private pointer. This is
     727             :                    needed in, for example, the internal notify code,
     728             :                    which creates a new notify context for each tree
     729             :                    connect, and expects to receive messages to each of
     730             :                    them. */
     731    10669266 :                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
     732       35440 :                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
     733             :                                   (unsigned)msg_type, private_data));
     734       35440 :                         cb->fn = fn;
     735       35440 :                         cb->private_data = private_data;
     736       35440 :                         return NT_STATUS_OK;
     737             :                 }
     738             :         }
     739             : 
     740      460492 :         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
     741           0 :                 return NT_STATUS_NO_MEMORY;
     742             :         }
     743             : 
     744      460492 :         cb->msg_type = msg_type;
     745      460492 :         cb->fn = fn;
     746      460492 :         cb->private_data = private_data;
     747             : 
     748      460492 :         DLIST_ADD(msg_ctx->callbacks, cb);
     749      460492 :         return NT_STATUS_OK;
     750             : }
     751             : 
     752             : /*
     753             :   De-register the function for a particular message type.
     754             : */
     755      201562 : void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
     756             :                           void *private_data)
     757             : {
     758        4554 :         struct messaging_callback *cb, *next;
     759             : 
     760     5452279 :         for (cb = ctx->callbacks; cb; cb = next) {
     761     5250717 :                 next = cb->next;
     762     5250717 :                 if ((cb->msg_type == msg_type)
     763      201562 :                     && (cb->private_data == private_data)) {
     764      201562 :                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
     765             :                                   (unsigned)msg_type, private_data));
     766      201562 :                         DLIST_REMOVE(ctx->callbacks, cb);
     767      201562 :                         TALLOC_FREE(cb);
     768             :                 }
     769             :         }
     770      201562 : }
     771             : 
     772             : /*
     773             :   Send a message to a particular server
     774             : */
     775      236383 : NTSTATUS messaging_send(struct messaging_context *msg_ctx,
     776             :                         struct server_id server, uint32_t msg_type,
     777             :                         const DATA_BLOB *data)
     778             : {
     779      236383 :         struct iovec iov = {0};
     780             : 
     781      236383 :         if (data != NULL) {
     782      235830 :                 iov.iov_base = data->data;
     783      235830 :                 iov.iov_len = data->length;
     784         585 :         };
     785             : 
     786      236383 :         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
     787             : }
     788             : 
     789       17737 : NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
     790             :                             struct server_id server, uint32_t msg_type,
     791             :                             const uint8_t *buf, size_t len)
     792             : {
     793       17737 :         DATA_BLOB blob = data_blob_const(buf, len);
     794       17737 :         return messaging_send(msg_ctx, server, msg_type, &blob);
     795             : }
     796             : 
     797      183488 : static int messaging_post_self(struct messaging_context *msg_ctx,
     798             :                                struct server_id src, struct server_id dst,
     799             :                                uint32_t msg_type,
     800             :                                const struct iovec *iov, int iovlen,
     801             :                                const int *fds, size_t num_fds)
     802             : {
     803         369 :         struct messaging_rec *rec;
     804         369 :         bool ok;
     805             : 
     806      183488 :         rec = messaging_rec_create(
     807             :                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
     808      183488 :         if (rec == NULL) {
     809           0 :                 return ENOMEM;
     810             :         }
     811             : 
     812      183488 :         ok = messaging_alert_event_contexts(msg_ctx);
     813      183488 :         if (!ok) {
     814           0 :                 TALLOC_FREE(rec);
     815           0 :                 return ENOMEM;
     816             :         }
     817             : 
     818      183488 :         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
     819             : 
     820      183119 :         return 0;
     821             : }
     822             : 
     823      620810 : int messaging_send_iov_from(struct messaging_context *msg_ctx,
     824             :                             struct server_id src, struct server_id dst,
     825             :                             uint32_t msg_type,
     826             :                             const struct iovec *iov, int iovlen,
     827             :                             const int *fds, size_t num_fds)
     828      620810 : {
     829        1686 :         int ret;
     830        1686 :         uint8_t hdr[MESSAGE_HDR_LENGTH];
     831      620810 :         struct iovec iov2[iovlen+1];
     832             : 
     833      620810 :         if (server_id_is_disconnected(&dst)) {
     834           0 :                 return EINVAL;
     835             :         }
     836             : 
     837      620810 :         if (num_fds > INT8_MAX) {
     838           0 :                 return EINVAL;
     839             :         }
     840             : 
     841      620810 :         if (server_id_equal(&dst, &msg_ctx->id)) {
     842      183330 :                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
     843             :                                           iov, iovlen, fds, num_fds);
     844      183330 :                 return ret;
     845             :         }
     846             : 
     847      437480 :         message_hdr_put(hdr, msg_type, src, dst);
     848      437480 :         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
     849      437480 :         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
     850             : 
     851      437480 :         if (dst.vnn != msg_ctx->id.vnn) {
     852           0 :                 if (num_fds > 0) {
     853           0 :                         return ENOSYS;
     854             :                 }
     855             : 
     856           0 :                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
     857           0 :                 return ret;
     858             :         }
     859             : 
     860      437480 :         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
     861             : 
     862      437480 :         if (ret == EACCES) {
     863           0 :                 become_root();
     864           0 :                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
     865             :                                          fds, num_fds);
     866           0 :                 unbecome_root();
     867             :         }
     868             : 
     869      437480 :         if (ret == ECONNREFUSED) {
     870             :                 /*
     871             :                  * Linux returns this when a socket exists in the file
     872             :                  * system without a listening process. This is not
     873             :                  * documented in susv4 or the linux manpages, but it's
     874             :                  * easily testable. For the higher levels this is the
     875             :                  * same as "destination does not exist"
     876             :                  */
     877        6074 :                 ret = ENOENT;
     878             :         }
     879             : 
     880      436163 :         return ret;
     881             : }
     882             : 
     883      620810 : NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
     884             :                             struct server_id dst, uint32_t msg_type,
     885             :                             const struct iovec *iov, int iovlen,
     886             :                             const int *fds, size_t num_fds)
     887             : {
     888        1686 :         int ret;
     889             : 
     890      620810 :         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
     891             :                                       iov, iovlen, fds, num_fds);
     892      620810 :         if (ret != 0) {
     893        6076 :                 return map_nt_error_from_unix(ret);
     894             :         }
     895      614734 :         return NT_STATUS_OK;
     896             : }
     897             : 
     898             : struct send_all_state {
     899             :         struct messaging_context *msg_ctx;
     900             :         int msg_type;
     901             :         const void *buf;
     902             :         size_t len;
     903             : };
     904             : 
     905       14145 : static int send_all_fn(pid_t pid, void *private_data)
     906             : {
     907       14145 :         struct send_all_state *state = private_data;
     908           6 :         NTSTATUS status;
     909             : 
     910       14145 :         if (pid == tevent_cached_getpid()) {
     911         230 :                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
     912         230 :                 return 0;
     913             :         }
     914             : 
     915       13915 :         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
     916       13915 :                                     state->msg_type, state->buf, state->len);
     917       13915 :         if (!NT_STATUS_IS_OK(status)) {
     918        6074 :                 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
     919             :                             (uintmax_t)pid, nt_errstr(status));
     920             :         }
     921             : 
     922       13910 :         return 0;
     923             : }
     924             : 
     925         230 : void messaging_send_all(struct messaging_context *msg_ctx,
     926             :                         int msg_type, const void *buf, size_t len)
     927             : {
     928         230 :         struct send_all_state state = {
     929             :                 .msg_ctx = msg_ctx, .msg_type = msg_type,
     930             :                 .buf = buf, .len = len
     931             :         };
     932           1 :         int ret;
     933             : 
     934             : #ifdef CLUSTER_SUPPORT
     935             :         if (lp_clustering()) {
     936             :                 struct ctdbd_connection *conn = messaging_ctdb_connection();
     937             :                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
     938             :                 struct iovec iov[] = {
     939             :                         { .iov_base = msghdr,
     940             :                           .iov_len = sizeof(msghdr) },
     941             :                         { .iov_base = discard_const_p(void, buf),
     942             :                           .iov_len = len }
     943             :                 };
     944             : 
     945             :                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
     946             :                                 (struct server_id) {0});
     947             : 
     948             :                 ret = ctdbd_messaging_send_iov(
     949             :                         conn, CTDB_BROADCAST_CONNECTED,
     950             :                         CTDB_SRVID_SAMBA_PROCESS,
     951             :                         iov, ARRAY_SIZE(iov));
     952             :                 if (ret != 0) {
     953             :                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
     954             :                                     strerror(ret));
     955             :                 }
     956             : 
     957             :                 return;
     958             :         }
     959             : #endif
     960             : 
     961         230 :         ret = messaging_dgm_forall(send_all_fn, &state);
     962         230 :         if (ret != 0) {
     963           0 :                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
     964             :                             strerror(ret));
     965             :         }
     966         230 : }
     967             : 
     968      187757 : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
     969             :                                                struct messaging_rec *rec)
     970             : {
     971         597 :         struct messaging_rec *result;
     972      187757 :         size_t fds_size = sizeof(int64_t) * rec->num_fds;
     973         597 :         size_t payload_len;
     974             : 
     975      187757 :         payload_len = rec->buf.length + fds_size;
     976      187757 :         if (payload_len < rec->buf.length) {
     977             :                 /* overflow */
     978           0 :                 return NULL;
     979             :         }
     980             : 
     981      187757 :         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
     982             :                                       payload_len);
     983      187757 :         if (result == NULL) {
     984           0 :                 return NULL;
     985             :         }
     986      187757 :         *result = *rec;
     987             : 
     988             :         /* Doesn't fail, see talloc_pooled_object */
     989             : 
     990      187757 :         result->buf.data = talloc_memdup(result, rec->buf.data,
     991             :                                          rec->buf.length);
     992             : 
     993      187757 :         result->fds = NULL;
     994      187757 :         if (result->num_fds > 0) {
     995          56 :                 size_t i;
     996             : 
     997        1110 :                 result->fds = talloc_memdup(result, rec->fds, fds_size);
     998             : 
     999        2223 :                 for (i=0; i<rec->num_fds; i++) {
    1000             :                         /*
    1001             :                          * fd's can only exist once
    1002             :                          */
    1003        1113 :                         rec->fds[i] = -1;
    1004             :                 }
    1005             :         }
    1006             : 
    1007      187160 :         return result;
    1008             : }
    1009             : 
    1010             : struct messaging_filtered_read_state {
    1011             :         struct tevent_context *ev;
    1012             :         struct messaging_context *msg_ctx;
    1013             :         struct messaging_dgm_fde *fde;
    1014             :         struct messaging_ctdb_fde *cluster_fde;
    1015             : 
    1016             :         bool (*filter)(struct messaging_rec *rec, void *private_data);
    1017             :         void *private_data;
    1018             : 
    1019             :         struct messaging_rec *rec;
    1020             : };
    1021             : 
    1022             : static void messaging_filtered_read_cleanup(struct tevent_req *req,
    1023             :                                             enum tevent_req_state req_state);
    1024             : 
    1025      153800 : struct tevent_req *messaging_filtered_read_send(
    1026             :         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
    1027             :         struct messaging_context *msg_ctx,
    1028             :         bool (*filter)(struct messaging_rec *rec, void *private_data),
    1029             :         void *private_data)
    1030             : {
    1031        3800 :         struct tevent_req *req;
    1032        3800 :         struct messaging_filtered_read_state *state;
    1033        3800 :         size_t new_waiters_len;
    1034        3800 :         bool ok;
    1035             : 
    1036      153800 :         req = tevent_req_create(mem_ctx, &state,
    1037             :                                 struct messaging_filtered_read_state);
    1038      153800 :         if (req == NULL) {
    1039           0 :                 return NULL;
    1040             :         }
    1041      153800 :         state->ev = ev;
    1042      153800 :         state->msg_ctx = msg_ctx;
    1043      153800 :         state->filter = filter;
    1044      153800 :         state->private_data = private_data;
    1045             : 
    1046             :         /*
    1047             :          * We have to defer the callback here, as we might be called from
    1048             :          * within a different tevent_context than state->ev
    1049             :          */
    1050      153800 :         tevent_req_defer_callback(req, state->ev);
    1051             : 
    1052      153800 :         state->fde = messaging_dgm_register_tevent_context(state, ev);
    1053      153800 :         if (tevent_req_nomem(state->fde, req)) {
    1054           0 :                 return tevent_req_post(req, ev);
    1055             :         }
    1056             : 
    1057      153800 :         if (lp_clustering()) {
    1058           0 :                 state->cluster_fde =
    1059           0 :                         messaging_ctdb_register_tevent_context(state, ev);
    1060           0 :                 if (tevent_req_nomem(state->cluster_fde, req)) {
    1061           0 :                         return tevent_req_post(req, ev);
    1062             :                 }
    1063             :         }
    1064             : 
    1065             :         /*
    1066             :          * We add ourselves to the "new_waiters" array, not the "waiters"
    1067             :          * array. If we are called from within messaging_read_done,
    1068             :          * messaging_dispatch_rec will be in an active for-loop on
    1069             :          * "waiters". We must be careful not to mess with this array, because
    1070             :          * it could mean that a single event is being delivered twice.
    1071             :          */
    1072             : 
    1073      153800 :         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
    1074             : 
    1075      153800 :         if (new_waiters_len == msg_ctx->num_new_waiters) {
    1076        3734 :                 struct tevent_req **tmp;
    1077             : 
    1078      133290 :                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
    1079             :                                      struct tevent_req *, new_waiters_len+1);
    1080      133290 :                 if (tevent_req_nomem(tmp, req)) {
    1081           0 :                         return tevent_req_post(req, ev);
    1082             :                 }
    1083      133290 :                 msg_ctx->new_waiters = tmp;
    1084             :         }
    1085             : 
    1086      153800 :         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
    1087      153800 :         msg_ctx->num_new_waiters += 1;
    1088      153800 :         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
    1089             : 
    1090      153800 :         ok = messaging_register_event_context(msg_ctx, ev);
    1091      153800 :         if (!ok) {
    1092           0 :                 tevent_req_oom(req);
    1093           0 :                 return tevent_req_post(req, ev);
    1094             :         }
    1095             : 
    1096      150000 :         return req;
    1097             : }
    1098             : 
    1099      147204 : static void messaging_filtered_read_cleanup(struct tevent_req *req,
    1100             :                                             enum tevent_req_state req_state)
    1101             : {
    1102      147204 :         struct messaging_filtered_read_state *state = tevent_req_data(
    1103             :                 req, struct messaging_filtered_read_state);
    1104      147204 :         struct messaging_context *msg_ctx = state->msg_ctx;
    1105        3665 :         size_t i;
    1106        3665 :         bool ok;
    1107             : 
    1108      147204 :         tevent_req_set_cleanup_fn(req, NULL);
    1109             : 
    1110      147204 :         TALLOC_FREE(state->fde);
    1111      147204 :         TALLOC_FREE(state->cluster_fde);
    1112             : 
    1113      147204 :         ok = messaging_deregister_event_context(msg_ctx, state->ev);
    1114      147204 :         if (!ok) {
    1115           0 :                 abort();
    1116             :         }
    1117             : 
    1118             :         /*
    1119             :          * Just set the [new_]waiters entry to NULL, be careful not to mess
    1120             :          * with the other "waiters" array contents. We are often called from
    1121             :          * within "messaging_dispatch_rec", which loops over
    1122             :          * "waiters". Messing with the "waiters" array will mess up that
    1123             :          * for-loop.
    1124             :          */
    1125             : 
    1126      208740 :         for (i=0; i<msg_ctx->num_waiters; i++) {
    1127       82357 :                 if (msg_ctx->waiters[i] == req) {
    1128       20821 :                         msg_ctx->waiters[i] = NULL;
    1129       20821 :                         return;
    1130             :                 }
    1131             :         }
    1132             : 
    1133      500649 :         for (i=0; i<msg_ctx->num_new_waiters; i++) {
    1134      500649 :                 if (msg_ctx->new_waiters[i] == req) {
    1135      126383 :                         msg_ctx->new_waiters[i] = NULL;
    1136      126383 :                         return;
    1137             :                 }
    1138             :         }
    1139             : }
    1140             : 
    1141        4269 : static void messaging_filtered_read_done(struct tevent_req *req,
    1142             :                                          struct messaging_rec *rec)
    1143             : {
    1144        4269 :         struct messaging_filtered_read_state *state = tevent_req_data(
    1145             :                 req, struct messaging_filtered_read_state);
    1146             : 
    1147        4269 :         state->rec = messaging_rec_dup(state, rec);
    1148        4269 :         if (tevent_req_nomem(state->rec, req)) {
    1149           0 :                 return;
    1150             :         }
    1151        4269 :         tevent_req_done(req);
    1152             : }
    1153             : 
    1154        4269 : int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
    1155             :                                  struct messaging_rec **presult)
    1156             : {
    1157        4269 :         struct messaging_filtered_read_state *state = tevent_req_data(
    1158             :                 req, struct messaging_filtered_read_state);
    1159         228 :         int err;
    1160             : 
    1161        4269 :         if (tevent_req_is_unix_error(req, &err)) {
    1162           0 :                 tevent_req_received(req);
    1163           0 :                 return err;
    1164             :         }
    1165        4269 :         if (presult != NULL) {
    1166        4269 :                 *presult = talloc_move(mem_ctx, &state->rec);
    1167             :         }
    1168        4269 :         tevent_req_received(req);
    1169        4269 :         return 0;
    1170             : }
    1171             : 
    1172             : struct messaging_read_state {
    1173             :         uint32_t msg_type;
    1174             :         struct messaging_rec *rec;
    1175             : };
    1176             : 
    1177             : static bool messaging_read_filter(struct messaging_rec *rec,
    1178             :                                   void *private_data);
    1179             : static void messaging_read_done(struct tevent_req *subreq);
    1180             : 
    1181       34868 : struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
    1182             :                                        struct tevent_context *ev,
    1183             :                                        struct messaging_context *msg,
    1184             :                                        uint32_t msg_type)
    1185             : {
    1186         949 :         struct tevent_req *req, *subreq;
    1187         949 :         struct messaging_read_state *state;
    1188             : 
    1189       34868 :         req = tevent_req_create(mem_ctx, &state,
    1190             :                                 struct messaging_read_state);
    1191       34868 :         if (req == NULL) {
    1192           0 :                 return NULL;
    1193             :         }
    1194       34868 :         state->msg_type = msg_type;
    1195             : 
    1196       34868 :         subreq = messaging_filtered_read_send(state, ev, msg,
    1197             :                                               messaging_read_filter, state);
    1198       34868 :         if (tevent_req_nomem(subreq, req)) {
    1199           0 :                 return tevent_req_post(req, ev);
    1200             :         }
    1201       34868 :         tevent_req_set_callback(subreq, messaging_read_done, req);
    1202       34868 :         return req;
    1203             : }
    1204             : 
    1205       25101 : static bool messaging_read_filter(struct messaging_rec *rec,
    1206             :                                   void *private_data)
    1207             : {
    1208       25101 :         struct messaging_read_state *state = talloc_get_type_abort(
    1209             :                 private_data, struct messaging_read_state);
    1210             : 
    1211       25101 :         if (rec->num_fds != 0) {
    1212         696 :                 return false;
    1213             :         }
    1214             : 
    1215       24403 :         return rec->msg_type == state->msg_type;
    1216             : }
    1217             : 
    1218         172 : static void messaging_read_done(struct tevent_req *subreq)
    1219             : {
    1220         172 :         struct tevent_req *req = tevent_req_callback_data(
    1221             :                 subreq, struct tevent_req);
    1222         172 :         struct messaging_read_state *state = tevent_req_data(
    1223             :                 req, struct messaging_read_state);
    1224         112 :         int ret;
    1225             : 
    1226         172 :         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
    1227         172 :         TALLOC_FREE(subreq);
    1228         172 :         if (tevent_req_error(req, ret)) {
    1229           0 :                 return;
    1230             :         }
    1231         172 :         tevent_req_done(req);
    1232             : }
    1233             : 
    1234         172 : int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
    1235             :                         struct messaging_rec **presult)
    1236             : {
    1237         172 :         struct messaging_read_state *state = tevent_req_data(
    1238             :                 req, struct messaging_read_state);
    1239         112 :         int err;
    1240             : 
    1241         172 :         if (tevent_req_is_unix_error(req, &err)) {
    1242           0 :                 return err;
    1243             :         }
    1244         172 :         if (presult != NULL) {
    1245          70 :                 *presult = talloc_move(mem_ctx, &state->rec);
    1246             :         }
    1247          60 :         return 0;
    1248             : }
    1249             : 
    1250       64580 : static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
    1251             : {
    1252       64580 :         if (msg_ctx->num_new_waiters == 0) {
    1253       59506 :                 return true;
    1254             :         }
    1255             : 
    1256        5070 :         if (talloc_array_length(msg_ctx->waiters) <
    1257        5070 :             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
    1258         217 :                 struct tevent_req **tmp;
    1259        3102 :                 tmp = talloc_realloc(
    1260             :                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
    1261             :                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
    1262        3102 :                 if (tmp == NULL) {
    1263           0 :                         DEBUG(1, ("%s: talloc failed\n", __func__));
    1264           0 :                         return false;
    1265             :                 }
    1266        3102 :                 msg_ctx->waiters = tmp;
    1267             :         }
    1268             : 
    1269        5070 :         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
    1270        5070 :                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
    1271             : 
    1272        5070 :         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
    1273        5070 :         msg_ctx->num_new_waiters = 0;
    1274             : 
    1275        5070 :         return true;
    1276             : }
    1277             : 
    1278      269018 : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
    1279             :                                        struct messaging_rec *rec)
    1280             : {
    1281         711 :         struct messaging_callback *cb, *next;
    1282             : 
    1283     1420919 :         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
    1284        5292 :                 size_t j;
    1285             : 
    1286     1356520 :                 next = cb->next;
    1287     1356520 :                 if (cb->msg_type != rec->msg_type) {
    1288     1151901 :                         continue;
    1289             :                 }
    1290             : 
    1291             :                 /*
    1292             :                  * the old style callbacks don't support fd passing
    1293             :                  */
    1294      204619 :                 for (j=0; j < rec->num_fds; j++) {
    1295           0 :                         int fd = rec->fds[j];
    1296           0 :                         close(fd);
    1297             :                 }
    1298      204619 :                 rec->num_fds = 0;
    1299      204619 :                 rec->fds = NULL;
    1300             : 
    1301      204619 :                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
    1302             :                        rec->src, &rec->buf);
    1303             : 
    1304      204615 :                 return true;
    1305             :         }
    1306             : 
    1307       64164 :         return false;
    1308             : }
    1309             : 
    1310       64580 : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
    1311             :                                        struct tevent_context *ev,
    1312             :                                        struct messaging_rec *rec)
    1313             : {
    1314         235 :         size_t i;
    1315             : 
    1316       64580 :         if (!messaging_append_new_waiters(msg_ctx)) {
    1317           0 :                 return false;
    1318             :         }
    1319             : 
    1320       64345 :         i = 0;
    1321      287376 :         while (i < msg_ctx->num_waiters) {
    1322         889 :                 struct tevent_req *req;
    1323         889 :                 struct messaging_filtered_read_state *state;
    1324             : 
    1325      227065 :                 req = msg_ctx->waiters[i];
    1326      227065 :                 if (req == NULL) {
    1327             :                         /*
    1328             :                          * This got cleaned up. In the meantime,
    1329             :                          * move everything down one. We need
    1330             :                          * to keep the order of waiters, as
    1331             :                          * other code may depend on this.
    1332             :                          */
    1333        6287 :                         ARRAY_DEL_ELEMENT(
    1334         227 :                                 msg_ctx->waiters, i, msg_ctx->num_waiters);
    1335        6287 :                         msg_ctx->num_waiters -= 1;
    1336        6287 :                         continue;
    1337             :                 }
    1338             : 
    1339      220778 :                 state = tevent_req_data(
    1340             :                         req, struct messaging_filtered_read_state);
    1341      440883 :                 if ((ev == state->ev) &&
    1342      220105 :                     state->filter(rec, state->private_data)) {
    1343        4269 :                         messaging_filtered_read_done(req, rec);
    1344        4269 :                         return true;
    1345             :                 }
    1346             : 
    1347      216509 :                 i += 1;
    1348             :         }
    1349             : 
    1350       60304 :         return false;
    1351             : }
    1352             : 
    1353             : /*
    1354             :   Dispatch one messaging_rec
    1355             : */
    1356       85770 : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
    1357             :                                    struct tevent_context *ev,
    1358             :                                    struct messaging_rec *rec)
    1359             : {
    1360         345 :         bool consumed;
    1361         345 :         size_t i;
    1362             : 
    1363       85770 :         if (ev == msg_ctx->event_ctx) {
    1364       85589 :                 consumed = messaging_dispatch_classic(msg_ctx, rec);
    1365       85589 :                 if (consumed) {
    1366       42993 :                         return;
    1367             :                 }
    1368             :         }
    1369             : 
    1370       42651 :         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
    1371       42651 :         if (consumed) {
    1372        2611 :                 return;
    1373             :         }
    1374             : 
    1375       39827 :         if (ev != msg_ctx->event_ctx) {
    1376           0 :                 struct iovec iov;
    1377         158 :                 int fds[MAX(1, rec->num_fds)];
    1378           0 :                 int ret;
    1379             : 
    1380             :                 /*
    1381             :                  * We've been listening on a nested event
    1382             :                  * context. Messages need to be handled in the main
    1383             :                  * event context, so post to ourselves
    1384             :                  */
    1385             : 
    1386         158 :                 iov.iov_base = rec->buf.data;
    1387         158 :                 iov.iov_len = rec->buf.length;
    1388             : 
    1389         158 :                 for (i=0; i<rec->num_fds; i++) {
    1390           0 :                         fds[i] = rec->fds[i];
    1391             :                 }
    1392             : 
    1393         158 :                 ret = messaging_post_self(
    1394         158 :                         msg_ctx, rec->src, rec->dest, rec->msg_type,
    1395         158 :                         &iov, 1, fds, rec->num_fds);
    1396         158 :                 if (ret == 0) {
    1397         158 :                         return;
    1398             :                 }
    1399             :         }
    1400             : }
    1401             : 
    1402             : static int mess_parent_dgm_cleanup(void *private_data);
    1403             : static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
    1404             : 
    1405          89 : bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
    1406             : {
    1407           0 :         struct tevent_req *req;
    1408             : 
    1409          89 :         req = background_job_send(
    1410             :                 msg, msg->event_ctx, msg, NULL, 0,
    1411          89 :                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
    1412             :                             60*15),
    1413             :                 mess_parent_dgm_cleanup, msg);
    1414          89 :         if (req == NULL) {
    1415           0 :                 DBG_WARNING("background_job_send failed\n");
    1416           0 :                 return false;
    1417             :         }
    1418          89 :         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
    1419          89 :         return true;
    1420             : }
    1421             : 
    1422           0 : static int mess_parent_dgm_cleanup(void *private_data)
    1423             : {
    1424           0 :         int ret;
    1425             : 
    1426           0 :         ret = messaging_dgm_wipe();
    1427           0 :         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
    1428             :                    ret ? strerror(ret) : "ok"));
    1429           0 :         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
    1430             :                            60*15);
    1431             : }
    1432             : 
    1433           0 : static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
    1434             : {
    1435           0 :         struct messaging_context *msg = tevent_req_callback_data(
    1436             :                 req, struct messaging_context);
    1437           0 :         NTSTATUS status;
    1438             : 
    1439           0 :         status = background_job_recv(req);
    1440           0 :         TALLOC_FREE(req);
    1441           0 :         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
    1442             :                   nt_errstr(status)));
    1443             : 
    1444           0 :         req = background_job_send(
    1445             :                 msg, msg->event_ctx, msg, NULL, 0,
    1446           0 :                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
    1447             :                             60*15),
    1448             :                 mess_parent_dgm_cleanup, msg);
    1449           0 :         if (req == NULL) {
    1450           0 :                 DEBUG(1, ("background_job_send failed\n"));
    1451           0 :                 return;
    1452             :         }
    1453           0 :         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
    1454             : }
    1455             : 
    1456           0 : int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
    1457             : {
    1458           0 :         int ret;
    1459             : 
    1460           0 :         if (pid == 0) {
    1461           0 :                 ret = messaging_dgm_wipe();
    1462             :         } else {
    1463           0 :                 ret = messaging_dgm_cleanup(pid);
    1464             :         }
    1465             : 
    1466           0 :         return ret;
    1467             : }
    1468             : 
    1469       79275 : struct tevent_context *messaging_tevent_context(
    1470             :         struct messaging_context *msg_ctx)
    1471             : {
    1472       79275 :         return msg_ctx->event_ctx;
    1473             : }
    1474             : 
    1475       24442 : struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
    1476             : {
    1477       24442 :         return msg_ctx->names_db;
    1478             : }
    1479             : 
    1480             : /** @} **/

Generated by: LCOV version 1.14