.. _ldms_msg_chan: ============= ldms_msg_chan ============= -------------------------------------------------------- Resilient Publish/Subscribe LDMS Message Channel Service -------------------------------------------------------- :Date: 2024-12-03 :Manual section: 7 :Manual group: LDMS :Version: LDMS 4.5 OVERVIEW ======== A Message Channel is a reliable connected interface between one or more peers. A Message Channel differs from a Message Client in the following way: - A Message Client takes a previously established LDMS transport and uses this transport to communicate with a single peer. - A Message Channel takes a - A Message Channel will attempt to connect and reconnect automatically with a peer. - Messages published on a channel are queued until the connection is established up until a configurable buffer limit is reached. The default queueing limit is 1MB. - A message Channel can be instantiated in one of three modes: - PUBLISH A connection is established with a remote peer and messages can be published with ldms_msg_chan_publish() - SUBSCRIBE The message channel will listen for incoming connections. Channels can be subscribed to with ldms_msg_chan_subscribe(). - BIDIR The channel is bi-directional and can both publish and subscribe. ldmsd_controller commands ------------------------- By default the Message Bus and Message Channel Services are disabled. The LDMS Message Bus has to be enabled with the ``msg_enable`` command in order for the LDMS Message Channel Service to work. This can also be done with the ldms_msg_enable() C-API or in Python with ldms.msg_enable() .. parsed-literal:: ``msg_enable`` ``prdcr_subscribe`` ``regex``\ =\ `PRDCR_REGEX` ``message_channel``\ =\ `NAME_REGEX` ``prdcr_unsubscribe`` ``regex``\ =\ `PRDCR_REGEX` ``message_channel``\ =\ `NAME_REGEX` C API ----- .. code:: c #include "ldms_msg_chan.h" int ldms_msg_chan_publish(ldms_msg_chan_t chan, const char *tag, uid_t uid, gid_t gid, uint32_t perm, ldms_msg_type_t type, char *msg, size_t msg_len); int ldms_msg_chan_subscribe(ldms_msg_chan_t chan, const char *regex, ldms_msg_event_cb_t msg_cb_fn, void *cb_arg); typedef int (*ldms_msg_event_cb_t)(ldms_msg_chan_event_t ev, void *cb_arg); int ldms_msg_chan_unsubscribe(ldms_msg_chan_t chan, const char *regex_s); ovis_log_t ldms_msg_chan_log(ldms_msg_chan_t chan); void ldms_msg_chan_set_q_limit(ldms_msg_chan_t chan, size_t q_limit); void ldms_msg_chan_stats(ldms_msg_chan_t chan, ldms_msg_chan_stats_t stats); const char *ldms_msg_chan_state_str(ldms_msg_chan_state_t state); const char *ldms_msg_chan_mode_str(ldms_msg_chan_mode_t mode); jbuf_t ldms_msg_chan_subscr_json(ldms_msg_chan_t chan); void ldms_msg_chan_close(ldms_msg_chan_t chan, int cancel); ldms_msg_chan_t ldms_msg_chan_find(const char *app_name); ldms_msg_chan_t ldms_msg_chan_list(const char *app_name); /* See "ldms_msg_chan.h" for the detailed API documentation */ Python APIs ----------- The MessageChannel class implements a bidirectionl message interface that is implemented over the LDMS Message Bus. The MessageChannel implements a peer to peer, bidirectional message bus interface. The interface is resilient; either side of the channel may be unavailable at the time the object is created. The necessary LDMS transport resources are created and managed automatically by the message channel. The local peer listens for incoming Message Bus connect requests and delivers data to subscribing MessageChannel subscribers. The remote peer connects to a remote MeessageChannel and sends messages to the remote peer published by MessageChannel clients. The subscriber and publisher exchange data over different message buses such that incoming (subscribed) and outgoing (published) messages cannot block or otherwise interfere with one another. The MessageChannel has an application name that may be useful when reporting statistics for multiple channels. It has no functional impact on the channel or its configuration. Multiple MessageChannel objects may have the same name, although this may not be recommended for subsequent statistics gathering. The interface is designed to be very simple: - ch = MessageChannel(...) # Create a new Metric Channel - ch.publish(...) # Publish a message - ch.subsribe(...) # subscribe to receive messages There are additional methods to obtain statistics, affect message logging, and set queueing levels as described below. .. code:: python from ovis_ldms import ldms ldms.msg_publish(name=, msg_data=, msg_type=, perm=) xprt = ldms.Xprt() xprt.connect(host="node0", port=411) xprt.msg_publish(name=, msg_data=, msg_type=, perm=) xprt.msg_subscribe(match=, is_regex=) xprt.msg_unsubscribe(match=, is_regex=) cli = ldms.MsgClient(match=, is_regex=, cb=, cb_arg=) # MsgClient callback signature def cb(MsgClient client, MsgData data, object cb_arg) data = cli.get_data() cli.close() # for more detailed description and usage help(ldms) DESCRIPTION =========== LDMS Message Service is a service in LDMS for publishing variable-length data to LDMS proecesses, and for receiving such data from LDMS processes via message subscription. When the published data arrive at an LDMS process the `msg_client`'s in the process that are authorized to see data will receive the data via the callback function `cb_fn()`. If there are remote subscribers on the LDMS process, the data will be forwarded to them if they are allowed to see the data. An LDMS Daemon (``ldmsd``) has to be configured with ``prdcr_subscribe`` commands in order to receive message data from its producers (``prdcr``). ``prdcr_subscribe`` can be issued many times, e.g. .. code:: sh # subscribe "s0" message channel on all producers prdcr_subscribe regex=.* message_channel=s0 # subscribe "s1" message channel on all producers prdcr_subscribe regex=.* message_channel=s1 The ``message_channel`` parameter can also be regular expression, e.g. .. code:: sh # subscribe message channels matching "app.*" or "sys.*" on all producers prdcr_subscribe regex=.* message_channel=app.* prdcr_subscribe regex=.* message_channel=sys.* This is the setup for the following figure: - ``bob_app``: an application run by ``bob``. It LDMS-connects to ``samp``. - ``samp``: an LDMS daemon (sampler). - A plugin in ``samp`` has an LDMS Message Client ``cli`` that subscribes to all channels (regex ``.*``). - Another plugin ``plug0`` in ``samp`` publishes to ``s1`` channel. - ``agg``: another LDMS daemon (aggregator). It has an LDMS connection to ``samp``. - ``agg`` subscribes ``.*`` channels on ``samp`` with the following command: - ``prdcr_subscribe regex=samp message_channel=.*`` - ``alice_app``: an application run by alice that LDMS-conencts to ``agg``. - ``alice_app`` subscribe for ``s0`` - ``alice_app`` has an LDMS Message Client ``cli`` that subscribes to ``"my"`` channel. The ``-->`` arrows illustrate possible message data paths. :: ┌──────────────┐ ┌────────┐ ┌───────────┐ │ samp │ │ agg │ │bob_app │ ├──────────────┤ ├────────┤ ├───────────┤ │ .----. │ │ .----. │ │ │ .----->|ldms|---------------->|ldms| │ │publish(s0)│ | │ '----'<---.│ │ '----' │ │ | │ | │ | |│ └────|───┘ │ v │ | │.----' |│ .-------' │.----. │ | │| .------. |│ | ┌────────────┐ │|ldms|--------' │| |cli:.*| |│ | │ alice_app │ │'----' │ │| |------| |│ | ├────────────┤ └───────────┘ │'>|cb_fn | |│ | │ .----. │ │ '------' |│ '---->|ldms|--.│ │ |│ │ '----' |│ │ |│ │ |│ │.-----------.|│ │ |│ │| plug0 ||│ │ .------. |│ │|-----------||│ │ |cli:s0| |│ │|publish(s1)|'│ │ |------| |│ │'-----------' │ │ |cb_fn |<'│ └──────────────┘ │ '------' │ └────────────┘ ``bob_app`` publishes a message by calling ``ldms_msg_chan_publish()`` function. Let's assume that ``bob_app`` publishes ``s0`` message over the LDMS transport to ``samp`` with ``0400`` permission. When ``s0`` message from ``bob_app`` arrives ``samp`` daemon, the logic in ``ldms`` library does the following: 1. **Credential check**: ``ldms`` library checks the credential in the message against the credential in the transport. If they are not the same, the message is dropped to prevent user impersonation. The exception is that ``root`` can impersonate any user so that ``ldmsd``'s can propagate user messages as user. 2. **Client iteration**: ``ldms`` library Goes through all clients that subscribe to ``s0`` channel (including the macthing clients that subscribe with regular expression). 3. **Authorization check**: Then, ``ldms`` library checks if the clients should be seeing the data with the credential information in the client, the credential and permission information in the message. 4. **Callbak**: clients' ``cb_fn()`` is called for the authorized clients. Examples of information availble in the msg callback event are message channel name, message data, original publisher's ``uid``, ``gid`` and address. Currently, a user can publish data to any channel. It is up to the receiver side to decide what to do. In this particular case, we will have 2 clients on ``samp``: the ``cli`` that subscribes for all channels (regex ``.*``), and a *hidden* client for remote subscription (remote client for short) created when ``samp`` received a subscription request message from ``agg`` (by ``prdcr_subscribe`` command in ``agg``). The ``cb_fn()`` of the remote client is an internal function in LDMS library that forwards the message to the subscribing peer. Note that the credential of the remote client is the credential from the LDMS transport authentication. Now, ``s0`` message has reached ``agg``, which has only one remote client: ``alice_app`` subscribing to ``s0`` channel. The ``ldms`` logic in ``agg`` will NOT forward this particular message to ``alice_app`` because ``bob_app`` the original publisher set ``0400`` permission. If ``bob_app`` published another message on ``s0`` channel to ``samp`` with ``0444`` permission, when it reached ``agg``, it will be forwarded it to ``alice_app``. ``cb_fn()`` on ``alice_app`` will be called once the ``s0`` data reached it. On another path, let's consider ``publish(s1)`` in ``plug0`` plugin in ``samp`` process. When ``plug0`` publishes ``s1`` with ``NULL`` transport (publishing locally), the ``ldms`` library in ``samp`` process does the same thing as if the data were received from a remote peer. The ``cli`` client in another plugin that subscribed for all channels will get the data (via ``cb_fn()``), and the remote client to ``agg`` will also get the data if authorized. CREDENTIALS AND PERMISSIONS =========================== The ``ldms_msg_chan_publish()`` function in C and the ``msg_publish()`` method in Python both receive credential ``cred`` and permission ``perm``. If ``cred`` is not set, the process' ``UID/GID`` are used. If a non-root user tries to impersonate anotehr user, the ``ldms`` library on the receiver side will drop the message. We allow ``root`` to impersonate other ``UID/GID`` so that users' message can be preserved when propagated. Before forwarding the message to the remote client, the remote client credential is checked if it is allowed to see the data from ``cred`` with ``perm``. CODE EXAMPLES ============= C example ----------------- .. code:: c #include "ldms.h" int main(int argc, char **argv) { /* - Publish to a remote peer named "remote_peer" to port 20001. * - Listen for incoming traffic on port 20002 on the interface * that resovles to "my_name". * - Use the authentication method "munge" * - If the peer is not present or disconnects, attempt to reconnect * every 6 seconds. */ ldms_msg_chan_t chan = ldms_msg_chan_new("ldms_msg_chan_client", "bidir", "sock", "remote_peer", 20001, "my_name", 20002, "munge", NULL, 6); /* - Get the log handle for this message channel * - Set the log level to "debug" */ ovis_log_t log = ldms_msg_chan_log(chan); ovis_log_set_level(log, OVIS_LDEBUG); /* - Subscribe to all messages from the peer * - When messages arrive call subs_msg_cb() * - Pass chan to the subs_msg_cb() function */ rc = ldms_msg_chan_subscribe(chan, ".*", subs_msg_cb, chan); return rc; } C subscribe example ------------------- .. code:: c #include #include #include "ldms.h" int cb_fn0(ldms_msg_chan_event_t ev, void *cb_arg); int success_cb(ldms_msg_chan_event_t ev, void *cb_arg); int main(int argc, char **argv) { int rc; ldms_t x; ldms_msg_chan_client_t cli0; char *cli0_ctxt; /* connect to an ldmsd */ x = ldms_xprt_new_with_auth("sock", "munge", NULL); ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL); cli0_ctxt = malloc(1024); /* some application context for cli0 */ printf("cli0_ctxt: %p\n", cli0_ctxt); /* subscribe "s0" messages that reached us; cb_fn0 is the callback function */ cli0 = ldms_msg_chan_subscribe("s0", 0, cb_fn0, cli0_ctxt, "s0 only"); /* Ask ldmsd to forward "s0" messages to us; * There will be NO success report callback since the function is `NULL`. */ rc = ldms_msg_chan_remote_subscribe(x, "s0", 0, NULL, NULL, LDMS_UNLIMITED); if (rc) return rc; /* The non-zero `rc` is a synchronous error that can still be returned, * e.g. EIO, ENOMEM, ENAMETOOLONG. */ /* ask ldmsd to forward messages with channels matching "app.*" regex to * us. `success_cb()` will be called once we know the result of the * subscription. */ rc = ldms_msg_chan_remote_subscribe(x, "app.*", 1, success_cb, NULL, LDMS_UNLIMITED); if (rc) return rc; sleep(10); /* sleep 10 sec */ /* Request an unsubscription to "s0" channel. Note that the `match` must * match the subscription request. The `success_cb` is called to report the success/failed result of the unsubscription request. */ rc = ldms_msg_chan_remote_unsubscribe(x, "s0", 0, success_cb, NULL); if (rc) return rc; /* Request an unsubscription to "app.*" channels. Note that the `match` * must match the subscription request. The `success_cb` is called to * report the success/failed result of the unsubscription request. */ rc = ldms_msg_chan_remote_unsubscribe(x, "app.*", 1, success_cb, NULL); if (rc) return rc; ldms_msg_chan_client_close(cli0); sleep(5); /* wait a bit so that we can see the events */ return 0; } int cb_fn0(ldms_msg_chan_event_t ev, void *cb_arg) { if (ev->type == LDMS_MSG_CHAN_EVENT_CLIENT_CLOSE) { /* * The client is "closed". We can clean up resources * associated with it here. No more event will occur * on this client. */ ldms_msg_chan_client_t cli = ev->close.client; printf("client closed:\n"); printf(" - match: %s\n", ldms_msg_chan_client_match_get(cli)); printf(" - is_regex: %d\n", ldms_msg_chan_client_is_regex_get(cli)); printf(" - desc: %s\n", ldms_msg_chan_client_desc_get(cli)); printf("freeing cli0_ctxt: %p\n", cb_arg); free(cb_arg); return 0; } assert(ev->type == LDMS_MSG_CHAN_EVENT_RECV); /* we expect RECV event or CLOSE event only */ if (ev->recv.type == LDMS_MSG_CHAN_STRING) { printf("channel name: %s\n", ev->recv.name); printf("message data: %s\n", ev->recv.data); } if (ev->recv.type == LDMS_MSG_CHAN_JSON) { /* process `ev->recv.json` */ } return 0; } int success_cb(ldms_msg_chan_event_t ev, void *cb_arg) { switch (ev->type) { case LDMS_MSG_CHAN_EVENT_SUBSCRIBE_STATUS: printf("'%s' subscription status: %d\n", ev->status.match, ev->status.status); break; case LDMS_MSG_CHAN_EVENT_UNSUBSCRIBE_STATUS: printf("'%s' unsubscription status: %d\n", ev->status.match, ev->status.status); break; default: printf("Unexpected event: %d\n", ev->type); } return 0; } Python publish examples ----------------------- .. code:: python from ovis_ldms import ldms x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge x.connect(host="localhost", port=411) # Explicitly specify STRING type. x.msg_publish(name="s0", data="somedata", msg_type=ldms.LDMS_MSG_CHAN_STRING, perm=0o400) # JSON; the `dict` data will be converted to JSON x.msg_publish(name="s0", data={"attr": "value"}, msg_type=ldms.LDMS_MSG_CHAN_JSON, perm=0o400) # Assumed STRING type if data is `str` or `bytes` when `msg_type` is omitted x.msg_publish(name="s0", data="somedata", perm=0o400) # Assumed JSON type if data is `dict` when `msg_type` is omitted x.msg_publish(name="app0", data={"attr": "value"}, perm=0o400) # We can publish to our process too ldms.msg_publish(name="s0", data="data") Python subscribe examples ------------------------- .. code:: python #!/usr/bin/python3 import time from ovis_ldms import ldms x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge x.connect(host="node0", port=411) def msg_recv_cb(cli, sd, cb_arg): print(f"[{sd.name}]: {sd.data}") def msg_sub_status_cb(ev, cb_arg): print(f"'{ev.match}' subscription status: {ev.status}") def msg_unsub_status_cb(ev, cb_arg): print(f"'{ev.match}' unsubscription status: {ev.status}") # Subscribe to messages that reaches our process on "s0" channel. # `msg_recv_cb()` will be called when "s0" message reached our process. cli0 = ldms.MsgClient(match="s0", is_regex=False, cb=msg_recv_cb, cb_arg=None) # Subscribe to messages that reaches our process on channels matching "app.*". # Since no `cb` is given, "app.*" data that reaches our process will be # stored in cli1. cli1 = ldms.MsgClient(match="app.*", is_regex=True) # Request peer for message forwarding to us only on channel "s0". # The status result of the subscription will be notified via # `msg_sub_status_cb`. x.msg_subscribe("s0", is_regex=False, cb=msg_sub_status_cb, cb_arg=None) # Request peer for message forwarding to us on channels matching "app.*". # Since no `cb` is given, this call becomes blocking, waiting for the status # event. An exception is raised if the subscription request resulted in an # error. x.msg_subscribe("app.*", is_regex=True) print(f"app.* subscription status: 0") # b/c an exception is raised if failed time.sleep(10) # wait a bit to get events # "s0" messages were handled by `msg_recv_cb`. # Data of "app.*" messges are stored in `cli1` since no `cb` was given. sd = cli1.get_data() while sd is not None: print(f"[{sd.name}]: {sd.data}") sd = cli1.get_data() # Request "s0" subscription cancellation to peer; notify result via `cb` x.msg_unsubscribe("s0", is_regex=False, cb=msg_unsub_status_cb, cb_arg=None) # Request "app.*" subscription cancellation to peer; block-wait for result, # raising an exception if failed. x.msg_unsubscribe("app.*", is_regex=True) print(f"app.* unsubscription status: 0") # b/c an exception is raised if failed # Terminate message clients and the connection cli0.close() cli1.close() x.close() THREADING ========= This section describes the threading model of the Message Channel API and what is safe to call from each context. Thread Safety of the API ------------------------ All ``ldms_msg_chan_*`` functions are thread-safe. They use a per-channel mutex and the global channel lock, neither of which conflict with application or ldmsd threads. Internal Threads ---------------- Creating the first message channel in a process launches a scheduler thread shared across all channels process-wide. Each channel also gets its own I/O thread. These threads manage connection attempts, reconnection, and flushing the outbound message queue to the peer. Blocking Behavior of ldms_msg_chan_publish() -------------------------------------------- ``ldms_msg_chan_publish()`` copies the message into the outbound queue and returns immediately. If the peer is not yet connected, messages accumulate in the queue and are sent once the connection is established. If adding the message would exceed the queue limit, ``ldms_msg_chan_publish()`` returns ``ENOBUFS`` and the message is not queued. The default queue limit is 1 MB and can be adjusted with ``ldms_msg_chan_set_q_limit()``. Blocking Behavior of ldms_msg_chan_close() ------------------------------------------ With ``cancel=0``, ``ldms_msg_chan_close()`` blocks until all queued messages have been delivered to the peer and the channel is fully closed. With ``cancel=1``, queued messages are abandoned and the channel closes immediately. Callback Invocation Thread -------------------------- The callback registered with ``ldms_msg_chan_subscribe()`` is always invoked by an IO thread. A message channel only receives messages from a remote peer over a transport connection — there is no local in-process delivery path in ``ldms_msg_chan``. No LDMS Message Bus locks are held when the callback is invoked. What Is Safe to Call from the Callback --------------------------------------- Because no Message Bus locks are held during callback invocation, the following ``ldms_msg_chan_*`` functions are safe to call from within the callback: - ``ldms_msg_chan_publish()`` — to publish a message to the remote peer. ``ldms_msg_chan_publish()`` always delivers to the remote peer over the transport and never triggers inline local callback invocation, so there is no deadlock risk from calling it within a callback. - ``ldms_msg_chan_subscribe()`` — to register a new subscription on a channel, provided the channel was created in subscribe or bidir mode. - ``ldms_msg_chan_new()`` — to create a new channel. - ``ldms_msg_chan_set_q_limit()`` — to adjust the queue limit. - ``ldms_msg_chan_close()`` — safe from a locking standpoint, but calling it with ``cancel=0`` from within the callback will block the IO thread for the duration of waiting for all queued messages to be delivered and all subscriptions to be torn down. Use ``cancel=1`` if closing from within a callback. Note on CLIENT_CLOSE --------------------- The ``LDMS_MSG_EVENT_CLIENT_CLOSE`` event is not delivered to the application callback when using ``ldms_msg_chan_subscribe()`` — it is handled internally by the channel. No teardown action is needed in the callback for subscription cleanup. SEE ALSO ======== **ldmsd_controller**\ (8) **ldms_msg**\ (7)