ldms_msg_chan

Resilient Publish/Subscribe LDMS Message Channel Service

Date:

2024-12-03

Manual section:

7

Manual group:

LDMS

Version:

LDMS 4.5

OVERVIEW

A Message Channel is a reliable connected interface between one or more peers. A Message Channel differs from a Message Client in the following way:

  • A Message Client takes a previously established LDMS transport and uses this transport to communicate with a single peer.

  • A Message Channel takes a

  • A Message Channel will attempt to connect and reconnect automatically with a peer.

  • Messages published on a channel are queued until the connection is established up until a configurable buffer limit is reached. The default queueing limit is 1MB.

  • A message Channel can be instantiated in one of three modes:

    • PUBLISH A connection is established with a remote peer and

      messages can be published with ldms_msg_chan_publish()

    • SUBSCRIBE The message channel will listen for incoming

      connections. Channels can be subscribed to with ldms_msg_chan_subscribe().

    • BIDIR The channel is bi-directional and can both publish and

      subscribe.

ldmsd_controller commands

By default the Message Bus and Message Channel Services are disabled. The LDMS Message Bus has to be enabled with the msg_enable command in order for the LDMS Message Channel Service to work. This can also be done with the ldms_msg_enable() C-API or in Python with ldms.msg_enable()

msg_enable

prdcr_subscribe regex=PRDCR_REGEX message_channel=NAME_REGEX

prdcr_unsubscribe regex=PRDCR_REGEX message_channel=NAME_REGEX

C API

#include “ldms_msg_chan.h”

int ldms_msg_chan_publish(ldms_msg_chan_t chan, const char *tag,

uid_t uid, gid_t gid, uint32_t perm, ldms_msg_type_t type, char *msg, size_t msg_len);

int ldms_msg_chan_subscribe(ldms_msg_chan_t chan, const char *regex,

ldms_msg_event_cb_t msg_cb_fn, void *cb_arg);

typedef int (*ldms_msg_event_cb_t)(ldms_msg_chan_event_t ev, void *cb_arg);

int ldms_msg_chan_unsubscribe(ldms_msg_chan_t chan, const char *regex_s);

ovis_log_t ldms_msg_chan_log(ldms_msg_chan_t chan);

void ldms_msg_chan_set_q_limit(ldms_msg_chan_t chan, size_t q_limit);

void ldms_msg_chan_stats(ldms_msg_chan_t chan, ldms_msg_chan_stats_t stats);

const char *ldms_msg_chan_state_str(ldms_msg_chan_state_t state);

const char *ldms_msg_chan_mode_str(ldms_msg_chan_mode_t mode);

jbuf_t ldms_msg_chan_subscr_json(ldms_msg_chan_t chan);

void ldms_msg_chan_close(ldms_msg_chan_t chan, int cancel);

ldms_msg_chan_t ldms_msg_chan_find(const char *app_name);

ldms_msg_chan_t ldms_msg_chan_list(const char *app_name);

/* See “ldms_msg_chan.h” for the detailed API documentation */

Python APIs

The MessageChannel class implements a bidirectionl message interface that is implemented over the LDMS Message Bus.

The MessageChannel implements a peer to peer, bidirectional message bus interface. The interface is resilient; either side of the channel may be unavailable at the time the object is created. The necessary LDMS transport resources are created and managed automatically by the message channel.

The local peer listens for incoming Message Bus connect requests and delivers data to subscribing MessageChannel subscribers. The remote peer connects to a remote MeessageChannel and sends messages to the remote peer published by MessageChannel clients. The subscriber and publisher exchange data over different message buses such that incoming (subscribed) and outgoing (published) messages cannot block or otherwise interfere with one another.

The MessageChannel has an application name that may be useful when reporting statistics for multiple channels. It has no functional impact on the channel or its configuration. Multiple MessageChannel objects may have the same name, although this may not be recommended for subsequent statistics gathering.

The interface is designed to be very simple: - ch = MessageChannel(…) # Create a new Metric Channel - ch.publish(…) # Publish a message - ch.subsribe(…) # subscribe to receive messages

There are additional methods to obtain statistics, affect message logging, and set queueing levels as described below.

from ovis_ldms import ldms

ldms.msg_publish(name=<str>, msg_data=<str|dict>,
            msg_type=<None|ldms.LDMS_MSG_CHAN_STRING|ldms.LDMS_MSG_CHAN_JSON>,
            perm=<int>)

xprt = ldms.Xprt()
xprt.connect(host="node0", port=411)

xprt.msg_publish(name=<str>, msg_data=<str|dict>,
            msg_type=<None|ldms.LDMS_MSG_CHAN_STRING|ldms.LDMS_MSG_CHAN_JSON>,
            perm=<int>)

xprt.msg_subscribe(match=<str>, is_regex=<bool>)

xprt.msg_unsubscribe(match=<str>, is_regex=<bool>)

cli = ldms.MsgClient(match=<str>, is_regex=<bool>, cb=<callable|None>,
                        cb_arg=<object|None>)
# MsgClient callback signature
def cb(MsgClient client, MsgData data, object cb_arg)

data = cli.get_data()

cli.close()

# for more detailed description and usage
help(ldms)

DESCRIPTION

LDMS Message Service is a service in LDMS for publishing variable-length data to LDMS proecesses, and for receiving such data from LDMS processes via message subscription. When the published data arrive at an LDMS process the msg_client’s in the process that are authorized to see data will receive the data via the callback function cb_fn(). If there are remote subscribers on the LDMS process, the data will be forwarded to them if they are allowed to see the data.

An LDMS Daemon (ldmsd) has to be configured with prdcr_subscribe commands in order to receive message data from its producers (prdcr). prdcr_subscribe can be issued many times, e.g.

# subscribe "s0" message channel on all producers
prdcr_subscribe regex=.* message_channel=s0
# subscribe "s1" message channel on all producers
prdcr_subscribe regex=.* message_channel=s1

The message_channel parameter can also be regular expression, e.g.

# subscribe message channels matching "app.*" or "sys.*" on all producers
prdcr_subscribe regex=.* message_channel=app.*
prdcr_subscribe regex=.* message_channel=sys.*

This is the setup for the following figure:

  • bob_app: an application run by bob. It LDMS-connects to samp.

  • samp: an LDMS daemon (sampler).

    • A plugin in samp has an LDMS Message Client cli that subscribes to all channels (regex .*).

    • Another plugin plug0 in samp publishes to s1 channel.

  • agg: another LDMS daemon (aggregator). It has an LDMS connection to samp.

    • agg subscribes .* channels on samp with the following command:

      • prdcr_subscribe regex=samp message_channel=.*

  • alice_app: an application run by alice that LDMS-conencts to agg.

    • alice_app subscribe for s0

    • alice_app has an LDMS Message Client cli that subscribes to "my" channel.

The --> arrows illustrate possible message data paths.

                  ┌──────────────┐         ┌────────┐
┌───────────┐     │     samp     │         │  agg   │
│bob_app    │     ├──────────────┤         ├────────┤
├───────────┤     │   .----.     │         │ .----. │
│           │  .----->|ldms|---------------->|ldms| │
│publish(s0)│  |  │   '----'<---.│         │ '----' │
│  |        │  |  │     |       |│         └────|───┘
│  v        │  |  │.----'       |│      .-------'
│.----.     │  |  │| .------.   |│      | ┌────────────┐
│|ldms|--------'  │| |cli:.*|   |│      | │ alice_app  │
│'----'     │     │| |------|   |│      | ├────────────┤
└───────────┘     │'>|cb_fn |   |│      | │   .----.   │
                  │  '------'   |│      '---->|ldms|--.│
                  │             |│        │   '----'  |│
                  │             |│        │           |│
                  │.-----------.|│        │           |│
                  │|  plug0    ||│        │  .------. |│
                  │|-----------||│        │  |cli:s0| |│
                  │|publish(s1)|'│        │  |------| |│
                  │'-----------' │        │  |cb_fn |<'│
                  └──────────────┘        │  '------'  │
                                          └────────────┘

bob_app publishes a message by calling ldms_msg_chan_publish() function. Let’s assume that bob_app publishes s0 message over the LDMS transport to samp with 0400 permission.

When s0 message from bob_app arrives samp daemon, the logic in ldms library does the following:

  1. Credential check: ldms library checks the credential in the message against the credential in the transport. If they are not the same, the message is dropped to prevent user impersonation. The exception is that root can impersonate any user so that ldmsd’s can propagate user messages as user.

  2. Client iteration: ldms library Goes through all clients that subscribe to s0 channel (including the macthing clients that subscribe with regular expression).

  3. Authorization check: Then, ldms library checks if the clients should be seeing the data with the credential information in the client, the credential and permission information in the message.

  4. Callbak: clients’ cb_fn() is called for the authorized clients. Examples of information availble in the msg callback event are message channel name, message data, original publisher’s uid, gid and address. Currently, a user can publish data to any channel. It is up to the receiver side to decide what to do.

In this particular case, we will have 2 clients on samp: the cli that subscribes for all channels (regex .*), and a hidden client for remote subscription (remote client for short) created when samp received a subscription request message from agg (by prdcr_subscribe command in agg). The cb_fn() of the remote client is an internal function in LDMS library that forwards the message to the subscribing peer. Note that the credential of the remote client is the credential from the LDMS transport authentication.

Now, s0 message has reached agg, which has only one remote client: alice_app subscribing to s0 channel. The ldms logic in agg will NOT forward this particular message to alice_app because bob_app the original publisher set 0400 permission.

If bob_app published another message on s0 channel to samp with 0444 permission, when it reached agg, it will be forwarded it to alice_app. cb_fn() on alice_app will be called once the s0 data reached it.

On another path, let’s consider publish(s1) in plug0 plugin in samp process. When plug0 publishes s1 with NULL transport (publishing locally), the ldms library in samp process does the same thing as if the data were received from a remote peer. The cli client in another plugin that subscribed for all channels will get the data (via cb_fn()), and the remote client to agg will also get the data if authorized.

CREDENTIALS AND PERMISSIONS

The ldms_msg_chan_publish() function in C and the msg_publish() method in Python both receive credential cred and permission perm. If cred is not set, the process’ UID/GID are used. If a non-root user tries to impersonate anotehr user, the ldms library on the receiver side will drop the message. We allow root to impersonate other UID/GID so that users’ message can be preserved when propagated. Before forwarding the message to the remote client, the remote client credential is checked if it is allowed to see the data from cred with perm.

CODE EXAMPLES

C example

#include "ldms.h"

int main(int argc, char **argv)
{
 /* - Publish to a remote peer named "remote_peer" to port 20001.
  * - Listen for incoming traffic on port 20002 on the interface
  *   that resovles to "my_name".
  * - Use the authentication method "munge"
  * - If the peer is not present or disconnects, attempt to reconnect
  *   every 6 seconds.
  */
 ldms_msg_chan_t chan =
               ldms_msg_chan_new("ldms_msg_chan_client",
                                 "bidir",       "sock",
                                 "remote_peer", 20001,
                                 "my_name",     20002,
                                 "munge",       NULL,
                                 6);

   /* - Get the log handle for this message channel
    * - Set the log level to "debug"
    */
   ovis_log_t log = ldms_msg_chan_log(chan);
       ovis_log_set_level(log, OVIS_LDEBUG);

   /* - Subscribe to all messages from the peer
    * - When messages arrive call subs_msg_cb()
    * - Pass chan to the subs_msg_cb() function
    */
       rc = ldms_msg_chan_subscribe(chan, ".*", subs_msg_cb, chan);

   return rc;
}

C subscribe example

#include <stdio.h>
#include <unistd.h>
#include "ldms.h"

int cb_fn0(ldms_msg_chan_event_t ev, void *cb_arg);
int success_cb(ldms_msg_chan_event_t ev, void *cb_arg);

int main(int argc, char **argv)
{
    int rc;
    ldms_t x;
    ldms_msg_chan_client_t cli0;
    char *cli0_ctxt;

    /* connect to an ldmsd */
    x = ldms_xprt_new_with_auth("sock", "munge", NULL);
    ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL);

    cli0_ctxt = malloc(1024); /* some application context for cli0 */
    printf("cli0_ctxt: %p\n", cli0_ctxt);

    /* subscribe "s0" messages that reached us; cb_fn0 is the callback function */
    cli0 = ldms_msg_chan_subscribe("s0", 0, cb_fn0, cli0_ctxt, "s0 only");

    /* Ask ldmsd to forward "s0" messages to us;
     * There will be NO success report callback since the function is `NULL`. */
    rc = ldms_msg_chan_remote_subscribe(x, "s0", 0, NULL, NULL, LDMS_UNLIMITED);
    if (rc)
        return rc;
    /* The non-zero `rc` is a synchronous error that can still be returned,
     * e.g. EIO, ENOMEM, ENAMETOOLONG. */

    /* ask ldmsd to forward messages with channels matching "app.*" regex to
     * us.  `success_cb()` will be called once we know the result of the
     * subscription. */
    rc = ldms_msg_chan_remote_subscribe(x, "app.*", 1, success_cb, NULL, LDMS_UNLIMITED);
    if (rc)
        return rc;

    sleep(10); /* sleep 10 sec */

    /* Request an unsubscription to "s0" channel. Note that the `match` must
     * match the subscription request. The `success_cb` is called to report
       the success/failed result of the unsubscription request. */
    rc = ldms_msg_chan_remote_unsubscribe(x, "s0", 0, success_cb, NULL);
    if (rc)
        return rc;

    /* Request an unsubscription to "app.*" channels. Note that the `match`
     * must match the subscription request. The `success_cb` is called to
     * report the success/failed result of the unsubscription request. */
    rc = ldms_msg_chan_remote_unsubscribe(x, "app.*", 1, success_cb, NULL);
    if (rc)
        return rc;

    ldms_msg_chan_client_close(cli0);

    sleep(5); /* wait a bit so that we can see the events */

    return 0;
}

int cb_fn0(ldms_msg_chan_event_t ev, void *cb_arg)
{
    if (ev->type == LDMS_MSG_CHAN_EVENT_CLIENT_CLOSE) {
        /*
         * The client is "closed". We can clean up resources
         * associated with it here. No more event will occur
         * on this client.
         */
        ldms_msg_chan_client_t cli = ev->close.client;
        printf("client closed:\n");
        printf(" - match: %s\n", ldms_msg_chan_client_match_get(cli));
        printf(" - is_regex: %d\n", ldms_msg_chan_client_is_regex_get(cli));
        printf(" - desc: %s\n", ldms_msg_chan_client_desc_get(cli));
        printf("freeing cli0_ctxt: %p\n", cb_arg);
        free(cb_arg);
        return 0;
    }
    assert(ev->type == LDMS_MSG_CHAN_EVENT_RECV);
    /* we expect RECV event or CLOSE event only */
    if (ev->recv.type == LDMS_MSG_CHAN_STRING) {
        printf("channel name: %s\n", ev->recv.name);
        printf("message data: %s\n", ev->recv.data);
    }
    if (ev->recv.type == LDMS_MSG_CHAN_JSON) {
        /* process `ev->recv.json` */
    }
    return 0;
}

int success_cb(ldms_msg_chan_event_t ev, void *cb_arg)
{
    switch (ev->type) {
    case LDMS_MSG_CHAN_EVENT_SUBSCRIBE_STATUS:
        printf("'%s' subscription status: %d\n", ev->status.match,
                                                        ev->status.status);
        break;
    case LDMS_MSG_CHAN_EVENT_UNSUBSCRIBE_STATUS:
        printf("'%s' unsubscription status: %d\n", ev->status.match,
                                                          ev->status.status);
        break;
    default:
        printf("Unexpected event: %d\n", ev->type);
    }
    return 0;
}

Python publish examples

from ovis_ldms import ldms
x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge
x.connect(host="localhost", port=411)

# Explicitly specify STRING type.
x.msg_publish(name="s0", data="somedata", msg_type=ldms.LDMS_MSG_CHAN_STRING,
              perm=0o400)

# JSON; the `dict` data will be converted to JSON
x.msg_publish(name="s0", data={"attr": "value"}, msg_type=ldms.LDMS_MSG_CHAN_JSON,
              perm=0o400)

# Assumed STRING type if data is `str` or `bytes` when `msg_type` is omitted
x.msg_publish(name="s0", data="somedata", perm=0o400)

# Assumed JSON type if data is `dict` when `msg_type` is omitted
x.msg_publish(name="app0", data={"attr": "value"}, perm=0o400)

# We can publish to our process too
ldms.msg_publish(name="s0", data="data")

Python subscribe examples

#!/usr/bin/python3

import time
from ovis_ldms import ldms

x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge
x.connect(host="node0", port=411)

def msg_recv_cb(cli, sd, cb_arg):
    print(f"[{sd.name}]: {sd.data}")

def msg_sub_status_cb(ev, cb_arg):
    print(f"'{ev.match}' subscription status: {ev.status}")

def msg_unsub_status_cb(ev, cb_arg):
    print(f"'{ev.match}' unsubscription status: {ev.status}")

# Subscribe to messages that reaches our process on "s0" channel.
# `msg_recv_cb()` will be called when "s0" message reached our process.
cli0 = ldms.MsgClient(match="s0", is_regex=False, cb=msg_recv_cb, cb_arg=None)

# Subscribe to messages that reaches our process on channels matching "app.*".
# Since no `cb` is given, "app.*" data that reaches our process will be
# stored in cli1.
cli1 = ldms.MsgClient(match="app.*", is_regex=True)

# Request peer for message forwarding to us only on channel "s0".
# The status result of the subscription will be notified via
#  `msg_sub_status_cb`.
x.msg_subscribe("s0", is_regex=False, cb=msg_sub_status_cb, cb_arg=None)

# Request peer for message forwarding to us on channels matching "app.*".
# Since no `cb` is given, this call becomes blocking, waiting for the status
# event. An exception is raised if the subscription request resulted in an
# error.
x.msg_subscribe("app.*", is_regex=True)
print(f"app.* subscription status: 0") # b/c an exception is raised if failed

time.sleep(10) # wait a bit to get events

# "s0" messages were handled by `msg_recv_cb`.

# Data of "app.*" messges are stored in `cli1` since no `cb` was given.
sd = cli1.get_data()
while sd is not None:
    print(f"[{sd.name}]: {sd.data}")
    sd = cli1.get_data()

# Request "s0" subscription cancellation to peer; notify result via `cb`
x.msg_unsubscribe("s0", is_regex=False, cb=msg_unsub_status_cb, cb_arg=None)

# Request "app.*" subscription cancellation to peer; block-wait for result,
# raising an exception if failed.
x.msg_unsubscribe("app.*", is_regex=True)
print(f"app.* unsubscription status: 0") # b/c an exception is raised if failed

# Terminate message clients and the connection
cli0.close()
cli1.close()
x.close()

THREADING

This section describes the threading model of the Message Channel API and what is safe to call from each context.

Thread Safety of the API

All ldms_msg_chan_* functions are thread-safe. They use a per-channel mutex and the global channel lock, neither of which conflict with application or ldmsd threads.

Internal Threads

Creating the first message channel in a process launches a scheduler thread shared across all channels process-wide. Each channel also gets its own I/O thread. These threads manage connection attempts, reconnection, and flushing the outbound message queue to the peer.

Blocking Behavior of ldms_msg_chan_publish()

ldms_msg_chan_publish() copies the message into the outbound queue and returns immediately. If the peer is not yet connected, messages accumulate in the queue and are sent once the connection is established. If adding the message would exceed the queue limit, ldms_msg_chan_publish() returns ENOBUFS and the message is not queued. The default queue limit is 1 MB and can be adjusted with ldms_msg_chan_set_q_limit().

Blocking Behavior of ldms_msg_chan_close()

With cancel=0, ldms_msg_chan_close() blocks until all queued messages have been delivered to the peer and the channel is fully closed. With cancel=1, queued messages are abandoned and the channel closes immediately.

Callback Invocation Thread

The callback registered with ldms_msg_chan_subscribe() is always invoked by an IO thread. A message channel only receives messages from a remote peer over a transport connection — there is no local in-process delivery path in ldms_msg_chan.

No LDMS Message Bus locks are held when the callback is invoked.

What Is Safe to Call from the Callback

Because no Message Bus locks are held during callback invocation, the following ldms_msg_chan_* functions are safe to call from within the callback:

  • ldms_msg_chan_publish() — to publish a message to the remote peer. ldms_msg_chan_publish() always delivers to the remote peer over the transport and never triggers inline local callback invocation, so there is no deadlock risk from calling it within a callback.

  • ldms_msg_chan_subscribe() — to register a new subscription on a channel, provided the channel was created in subscribe or bidir mode.

  • ldms_msg_chan_new() — to create a new channel.

  • ldms_msg_chan_set_q_limit() — to adjust the queue limit.

  • ldms_msg_chan_close() — safe from a locking standpoint, but calling it with cancel=0 from within the callback will block the IO thread for the duration of waiting for all queued messages to be delivered and all subscriptions to be torn down. Use cancel=1 if closing from within a callback.

Note on CLIENT_CLOSE

The LDMS_MSG_EVENT_CLIENT_CLOSE event is not delivered to the application callback when using ldms_msg_chan_subscribe() — it is handled internally by the channel. No teardown action is needed in the callback for subscription cleanup.

SEE ALSO

ldmsd_controller(8) ldms_msg(7)