.. _ldms_msg: =========== ldms_msg =========== ---------------------------------------------- publish/subscribe data in LDMS message service ---------------------------------------------- :Date: 2024-12-03 :Manual section: 7 :Manual group: LDMS :Version: LDMS 4.5 SYNOPSIS ======== ldmsd_controller commands ------------------------- .. parsed-literal:: ``prdcr_subscribe`` ``regex``\ =\ `PRDCR_REGEX` ``message_tag``\ =\ `NAME_REGEX` ``prdcr_unsubscribe`` ``regex``\ =\ `PRDCR_REGEX` ``message_tag``\ =\ `NAME_REGEX` C API ----- .. code:: c #include "ldms.h" int ldms_msg_publish(ldms_t x, const char *msg_tag, ldms_msg_type_t msg_type, ldms_cred_t cred, uint32_t perm, const char *data, size_t data_len); typedef int (*ldms_msg_event_cb_t)(ldms_msg_event_t ev, void *cb_arg); ldms_msg_client_t ldms_msg_subscribe(const char *match, int is_regex, ldms_msg_event_cb_t cb_fn, void *cb_arg, const char *desc); void ldms_msg_close(ldms_msg_client_t c); int ldms_msg_remote_subscribe(ldms_t x, const char *match, int is_regex, ldms_msg_event_cb_t cb_fn, void *cb_arg, int64_t rate); int ldms_msg_remote_unsubscribe(ldms_t x, const char *match, int is_regex, ldms_msg_event_cb_t cb_fn, void *cb_arg); /* See "ldms.h" for the detailed API documentation */ Python APIs ----------- .. code:: python from ovis_ldms import ldms ldms.msg_publish(msg_tag=, msg_data=, msg_type=, perm=) xprt = ldms.Xprt() xprt.connect(host="node0", port=411) xprt.msg_publish(msg_tag=, msg_data=, msg_type=, perm=) xprt.msg_subscribe(match=, is_regex=) xprt.msg_unsubscribe(match=, is_regex=) cli = ldms.MsgClient(match=, is_regex=, cb=, cb_arg=) # MsgClient callback signature def cb(MsgClient client, MsgData data, object cb_arg) data = cli.get_data() cli.close() # for more detailed description and usage help(ldms) DESCRIPTION =========== LDMS Message Service is a service in LDMS for publishing variable-length data to LDMS proecesses, and for receiving such data from LDMS processes via message subscription. When the published data arrive at an LDMS process the `msg_client`'s in the process that are authorized to see data will receive the data via the callback function `cb_fn()`. If there are remote subscribers on the LDMS process, the data will be forwarded to them if they are allowed to see the data. An LDMS Daemon (``ldmsd``) has to be configured with ``prdcr_subscribe`` commands in order to receive message data from its producers (``prdcr``). ``prdcr_subscribe`` can be issued many times, e.g. .. code:: sh # subscribe "s0" message tag on all producers prdcr_subscribe regex=.* message_tag=s0 # subscribe "s1" message tag on all producers prdcr_subscribe regex=.* message_tag=s1 The ``message_tag`` parameter can also be regular expression, e.g. .. code:: sh # subscribe message tags matching "app.*" or "sys.*" on all producers prdcr_subscribe regex=.* message_tag=app.* prdcr_subscribe regex=.* message_tag=sys.* This is the setup for the following figure: - ``bob_app``: an application run by ``bob``. It LDMS-connects to ``samp``. - ``samp``: an LDMS daemon (sampler). - A plugin in ``samp`` has an LDMS Message Client ``cli`` that subscribes to all message_tags (regex ``.*``). - Another plugin ``plug0`` in ``samp`` publishes to ``s1`` message_tag. - ``agg``: another LDMS daemon (aggregator). It has an LDMS connection to ``samp``. - ``agg`` subscribes ``.*`` message_tags on ``samp`` with the following command: - ``prdcr_subscribe regex=samp message_tag=.*`` - ``alice_app``: an application run by alice that LDMS-conencts to ``agg``. - ``alice_app`` subscribe for ``s0`` - ``alice_app`` has an LDMS Message Client ``cli`` that subscribes to ``"my"`` message_tag. The ``-->`` arrows illustrate possible message data paths. :: ┌──────────────┐ ┌────────┐ ┌───────────┐ │ samp │ │ agg │ │bob_app │ ├──────────────┤ ├────────┤ ├───────────┤ │ .----. │ │ .----. │ │ │ .----->|ldms|---------------->|ldms| │ │publish(s0)│ | │ '----'<---.│ │ '----' │ │ | │ | │ | |│ └────|───┘ │ v │ | │.----' |│ .-------' │.----. │ | │| .------. |│ | ┌────────────┐ │|ldms|--------' │| |cli:.*| |│ | │ alice_app │ │'----' │ │| |------| |│ | ├────────────┤ └───────────┘ │'>|cb_fn | |│ | │ .----. │ │ '------' |│ '---->|ldms|--.│ │ |│ │ '----' |│ │ |│ │ |│ │.-----------.|│ │ |│ │| plug0 ||│ │ .------. |│ │|-----------||│ │ |cli:s0| |│ │|publish(s1)|'│ │ |------| |│ │'-----------' │ │ |cb_fn |<'│ └──────────────┘ │ '------' │ └────────────┘ ``bob_app`` publishes a message by calling ``ldms_msg_publish()`` function. Let's assume that ``bob_app`` publishes ``s0`` message over the LDMS transport to ``samp`` with ``0400`` permission. When ``s0`` message from ``bob_app`` arrives ``samp`` daemon, the logic in ``ldms`` library does the following: 1. **Credential check**: ``ldms`` library checks the credential in the message against the credential in the transport. If they are not the same, the message is dropped to prevent user impersonation. The exception is that ``root`` can impersonate any user so that ``ldmsd``'s can propagate user messages as user. 2. **Client iteration**: ``ldms`` library Goes through all clients that subscribe to ``s0`` tag (including the macthing clients that subscribe with regular expression). 3. **Authorization check**: Then, ``ldms`` library checks if the clients should be seeing the data with the credential information in the client, the credential and permission information in the message. 4. **Callbak**: clients' ``cb_fn()`` is called for the authorized clients. Examples of information availble in the msg callback event are message tag name, message data, original publisher's ``uid``, ``gid`` and address. Currently, a user can publish data to any tag. It is up to the receiver side to decide what to do. In this particular case, we will have 2 clients on ``samp``: the ``cli`` that subscribes for all tags (regex ``.*``), and a *hidden* client for remote subscription (remote client for short) created when ``samp`` received a subscription request message from ``agg`` (by ``prdcr_subscribe`` command in ``agg``). The ``cb_fn()`` of the remote client is an internal function in LDMS library that forwards the message to the subscribing peer. Note that the credential of the remote client is the credential from the LDMS transport authentication. Now, ``s0`` message has reached ``agg``, which has only one remote client: ``alice_app`` subscribing to ``s0`` tag. The ``ldms`` logic in ``agg`` will NOT forward this particular message to ``alice_app`` because ``bob_app`` the original publisher set ``0400`` permission. If ``bob_app`` published another message on ``s0`` tag to ``samp`` with ``0444`` permission, when it reached ``agg``, it will be forwarded it to ``alice_app``. ``cb_fn()`` on ``alice_app`` will be called once the ``s0`` data reached it. On another path, let's consider ``publish(s1)`` in ``plug0`` plugin in ``samp`` process. When ``plug0`` publishes ``s1`` with ``NULL`` transport (publishing locally), the ``ldms`` library in ``samp`` process does the same thing as if the data were received from a remote peer. The ``cli`` client in another plugin that subscribed for all tags will get the data (via ``cb_fn()``), and the remote client to ``agg`` will also get the data if authorized. CREDENTIALS AND PERMISSIONS =========================== The ``ldms_msg_publish()`` function in C and the ``msg_publish()`` method in Python both receive credential ``cred`` and permission ``perm``. If ``cred`` is not set, the process' ``UID/GID`` are used. If a non-root user tries to impersonate anotehr user, the ``ldms`` library on the receiver side will drop the message. We allow ``root`` to impersonate other ``UID/GID`` so that users' message can be preserved when propagated. Before forwarding the message to the remote client, the remote client credential is checked if it is allowed to see the data from ``cred`` with ``perm``. CODE EXAMPLES ============= C publish example ----------------- .. code:: c #include "ldms.h" int main(int argc, char **argv) { ldms_t x; int rc; x = ldms_xprt_new_with_auth("sock", "munge", NULL); /* synchronous connect for simplicity */ rc = ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL); if (rc) return rc; /* publish to peer */ rc = ldms_msg_publish(x, "s0", LDMS_MSG_STRING, NULL, 0400, "data", 5); /* publish to our process */ rc = ldms_msg_publish(NULL, "json_tag", LDMS_MSG_JSON, NULL, 0400, "{\"attr\":\"value\"}", 17); return rc; } C subscribe example ------------------- .. code:: c #include #include #include "ldms.h" int cb_fn0(ldms_msg_event_t ev, void *cb_arg); int success_cb(ldms_msg_event_t ev, void *cb_arg); int main(int argc, char **argv) { int rc; ldms_t x; ldms_msg_client_t cli0; char *cli0_ctxt; /* connect to an ldmsd */ x = ldms_xprt_new_with_auth("sock", "munge", NULL); ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL); cli0_ctxt = malloc(1024); /* some application context for cli0 */ printf("cli0_ctxt: %p\n", cli0_ctxt); /* subscribe "s0" messages that reached us; cb_fn0 is the callback function */ cli0 = ldms_msg_subscribe("s0", 0, cb_fn0, cli0_ctxt, "s0 only"); /* Ask ldmsd to forward "s0" messages to us; * There will be NO success report callback since the function is `NULL`. */ rc = ldms_msg_remote_subscribe(x, "s0", 0, NULL, NULL, LDMS_UNLIMITED); if (rc) return rc; /* The non-zero `rc` is a synchronous error that can still be returned, * e.g. EIO, ENOMEM, ENAMETOOLONG. */ /* ask ldmsd to forward messages with tags matching "app.*" regex to * us. `success_cb()` will be called once we know the result of the * subscription. */ rc = ldms_msg_remote_subscribe(x, "app.*", 1, success_cb, NULL, LDMS_UNLIMITED); if (rc) return rc; sleep(10); /* sleep 10 sec */ /* Request an unsubscription to "s0" tag. Note that the `match` must * match the subscription request. The `success_cb` is called to report the success/failed result of the unsubscription request. */ rc = ldms_msg_remote_unsubscribe(x, "s0", 0, success_cb, NULL); if (rc) return rc; /* Request an unsubscription to "app.*" tags. Note that the `match` * must match the subscription request. The `success_cb` is called to * report the success/failed result of the unsubscription request. */ rc = ldms_msg_remote_unsubscribe(x, "app.*", 1, success_cb, NULL); if (rc) return rc; ldms_msg_client_close(cli0); sleep(5); /* wait a bit so that we can see the events */ return 0; } int cb_fn0(ldms_msg_event_t ev, void *cb_arg) { if (ev->type == LDMS_MSG_EVENT_CLIENT_CLOSE) { /* * The client is "closed". We can clean up resources * associated with it here. No more event will occur * on this client. */ ldms_msg_client_t cli = ev->close.client; printf("client closed:\n"); printf(" - match: %s\n", ldms_msg_client_match_get(cli)); printf(" - is_regex: %d\n", ldms_msg_client_is_regex_get(cli)); printf(" - desc: %s\n", ldms_msg_client_desc_get(cli)); printf("freeing cli0_ctxt: %p\n", cb_arg); free(cb_arg); return 0; } assert(ev->type == LDMS_MSG_EVENT_RECV); /* we expect RECV event or CLOSE event only */ if (ev->recv.type == LDMS_MSG_STRING) { printf("msg_tag: %s\n", ev->recv.msg_tag); printf("message data: %s\n", ev->recv.data); } if (ev->recv.type == LDMS_MSG_JSON) { /* process `ev->recv.json` */ } return 0; } int success_cb(ldms_msg_event_t ev, void *cb_arg) { switch (ev->type) { case LDMS_MSG_EVENT_SUBSCRIBE_STATUS: printf("'%s' subscription status: %d\n", ev->status.match, ev->status.status); break; case LDMS_MSG_EVENT_UNSUBSCRIBE_STATUS: printf("'%s' unsubscription status: %d\n", ev->status.match, ev->status.status); break; default: printf("Unexpected event: %d\n", ev->type); } return 0; } Python publish examples ----------------------- .. code:: python from ovis_ldms import ldms x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge x.connect(host="localhost", port=411) # Explicitly specify STRING type. x.msg_publish(msg_tag="s0", data="somedata", msg_type=ldms.LDMS_MSG_STRING, perm=0o400) # JSON; the `dict` data will be converted to JSON x.msg_publish(msg_tag="s0", data={"attr": "value"}, msg_type=ldms.LDMS_MSG_JSON, perm=0o400) # Assumed STRING type if data is `str` or `bytes` when `msg_type` is omitted x.msg_publish(msg_tag="s0", data="somedata", perm=0o400) # Assumed JSON type if data is `dict` when `msg_type` is omitted x.msg_publish(msg_tag="app0", data={"attr": "value"}, perm=0o400) # We can publish to our process too ldms.msg_publish(msg_tag="s0", data="data") Python subscribe examples ------------------------- .. code:: python #!/usr/bin/python3 import time from ovis_ldms import ldms x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge x.connect(host="node0", port=411) def msg_recv_cb(cli, sd, cb_arg): print(f"[{sd.msg_tag}]: {sd.data}") def msg_sub_status_cb(ev, cb_arg): print(f"'{ev.match}' subscription status: {ev.status}") def msg_unsub_status_cb(ev, cb_arg): print(f"'{ev.match}' unsubscription status: {ev.status}") # Subscribe to messages that reaches our process on "s0" tag. # `msg_recv_cb()` will be called when "s0" message reached our process. cli0 = ldms.MsgClient(match="s0", is_regex=False, cb=msg_recv_cb, cb_arg=None) # Subscribe to messages that reaches our process on tags matching "app.*". # Since no `cb` is given, "app.*" data that reaches our process will be # stored in cli1. cli1 = ldms.MsgClient(match="app.*", is_regex=True) # Request peer for message forwarding to us only on tag "s0". # The status result of the subscription will be notified via # `msg_sub_status_cb`. x.msg_subscribe("s0", is_regex=False, cb=msg_sub_status_cb, cb_arg=None) # Request peer for message forwarding to us on tags matching "app.*". # Since no `cb` is given, this call becomes blocking, waiting for the status # event. An exception is raised if the subscription request resulted in an # error. x.msg_subscribe("app.*", is_regex=True) print(f"app.* subscription status: 0") # b/c an exception is raised if failed time.sleep(10) # wait a bit to get events # "s0" messages were handled by `msg_recv_cb`. # Data of "app.*" messges are stored in `cli1` since no `cb` was given. sd = cli1.get_data() while sd is not None: print(f"[{sd.msg_tag}]: {sd.data}") sd = cli1.get_data() # Request "s0" subscription cancellation to peer; notify result via `cb` x.msg_unsubscribe("s0", is_regex=False, cb=msg_unsub_status_cb, cb_arg=None) # Request "app.*" subscription cancellation to peer; block-wait for result, # raising an exception if failed. x.msg_unsubscribe("app.*", is_regex=True) print(f"app.* unsubscription status: 0") # b/c an exception is raised if failed # Terminate message clients and the connection cli0.close() cli1.close() x.close() THREADING ========= This section describes the threading model of the LDMS Message Bus API and what is safe to call from each context. Thread Safety of the API ------------------------ ``ldms_msg_subscribe()``, ``ldms_msg_client_close()``, and ``ldms_msg_publish()`` are all thread-safe. They use internal locks (``__msg_rwlock`` and per-tag rwlocks) that are independent of any application or ldmsd thread. No caller-held lock is acquired by these functions. Callback Invocation Thread -------------------------- The thread that invokes ``cb_fn()`` depends on the origin of the message: - For messages published within the same process (``ldms_msg_publish()`` with ``x=NULL``), the callback is invoked by whichever thread called ``ldms_msg_publish()`` — inline, before ``ldms_msg_publish()`` returns. - For messages arriving from a remote peer, the callback is invoked by the IO thread handling the incoming transport data. No LDMS Message Bus locks are held when ``cb_fn()`` is invoked. This means all ``ldms_msg_*`` functions are safe to call from within the callback. What Is Safe to Call from the Callback --------------------------------------- Because no Message Bus locks are held during callback invocation, the following are safe to call from within ``cb_fn()``: - ``ldms_msg_publish()`` — to forward or re-publish a message - ``ldms_msg_subscribe()`` — to register a new subscription - ``ldms_msg_client_close()`` — to close the current or another subscription **Lock discipline when publishing from the callback:** ``ldms_msg_publish(NULL, ...)`` invokes subscriber callbacks inline before returning. If the caller holds an application-level lock when calling ``ldms_msg_publish()`` from within a callback, and any subscriber of the published tag also tries to acquire the same lock, a deadlock results. Avoid holding application-level locks across a ``ldms_msg_publish()`` call inside a callback. Subscription Teardown and the CLIENT_CLOSE Event ------------------------------------------------- ``ldms_msg_client_close()`` begins teardown but returns before it is complete. After it returns: - No new ``LDMS_MSG_EVENT_RECV`` callbacks will be dispatched for this client. - The ``LDMS_MSG_EVENT_CLIENT_CLOSE`` event will be delivered asynchronously by an internal thread (``ldms_strm_cls``). The application must not free resources used by the callback until this event arrives. - The client handle must not be used after ``ldms_msg_client_close()`` is called. ``LDMS_MSG_EVENT_CLIENT_CLOSE`` is the last event guaranteed to be delivered to the client. It is safe to free all resources associated with the subscription at this point. A typical teardown pattern using a condition variable: .. code:: c static int cb_fn(ldms_msg_event_t ev, void *cb_arg) { struct my_ctxt_s *ctxt = cb_arg; switch (ev->type) { case LDMS_MSG_EVENT_RECV: /* process message */ break; case LDMS_MSG_EVENT_CLIENT_CLOSE: pthread_mutex_lock(&ctxt->lock); ctxt->client = NULL; pthread_cond_signal(&ctxt->close_cond); pthread_mutex_unlock(&ctxt->lock); /* Free resources associated with this subscription here */ break; } return 0; } static void close_client(struct my_ctxt_s *ctxt) { if (!ctxt->client) return; ldms_msg_client_close(ctxt->client); pthread_mutex_lock(&ctxt->lock); while (ctxt->client != NULL) pthread_cond_wait(&ctxt->close_cond, &ctxt->lock); pthread_mutex_unlock(&ctxt->lock); /* Safe: no more callbacks will be invoked */ } SEE ALSO ======== **ldmsd_controller**\ (8) **ldms_msg_chan**\ (7)