ldms_msg

publish/subscribe data in LDMS message service

Date:

2024-12-03

Manual section:

7

Manual group:

LDMS

Version:

LDMS 4.5

SYNOPSIS

ldmsd_controller commands

prdcr_subscribe regex=PRDCR_REGEX message_tag=NAME_REGEX

prdcr_unsubscribe regex=PRDCR_REGEX message_tag=NAME_REGEX

C API

#include "ldms.h"

int ldms_msg_publish(ldms_t x, const char *msg_tag,
                        ldms_msg_type_t msg_type,
                        ldms_cred_t cred,
                        uint32_t perm,
                        const char *data, size_t data_len);

typedef int (*ldms_msg_event_cb_t)(ldms_msg_event_t ev, void *cb_arg);

ldms_msg_client_t ldms_msg_subscribe(const char *match,
                              int is_regex, ldms_msg_event_cb_t cb_fn,
                              void *cb_arg, const char *desc);
void ldms_msg_close(ldms_msg_client_t c);

int ldms_msg_remote_subscribe(ldms_t x, const char *match, int is_regex,
                                 ldms_msg_event_cb_t cb_fn, void *cb_arg,
                                 int64_t rate);
int ldms_msg_remote_unsubscribe(ldms_t x, const char *match, int is_regex,
                                   ldms_msg_event_cb_t cb_fn, void *cb_arg);

/* See "ldms.h" for the detailed API documentation */

Python APIs

from ovis_ldms import ldms

ldms.msg_publish(msg_tag=<str>, msg_data=<str|dict>,
            msg_type=<None|ldms.LDMS_MSG_STRING|ldms.LDMS_MSG_JSON>,
            perm=<int>)

xprt = ldms.Xprt()
xprt.connect(host="node0", port=411)

xprt.msg_publish(msg_tag=<str>, msg_data=<str|dict>,
            msg_type=<None|ldms.LDMS_MSG_STRING|ldms.LDMS_MSG_JSON>,
            perm=<int>)

xprt.msg_subscribe(match=<str>, is_regex=<bool>)

xprt.msg_unsubscribe(match=<str>, is_regex=<bool>)

cli = ldms.MsgClient(match=<str>, is_regex=<bool>, cb=<callable|None>,
                        cb_arg=<object|None>)
# MsgClient callback signature
def cb(MsgClient client, MsgData data, object cb_arg)

data = cli.get_data()

cli.close()

# for more detailed description and usage
help(ldms)

DESCRIPTION

LDMS Message Service is a service in LDMS for publishing variable-length data to LDMS proecesses, and for receiving such data from LDMS processes via message subscription. When the published data arrive at an LDMS process the msg_client’s in the process that are authorized to see data will receive the data via the callback function cb_fn(). If there are remote subscribers on the LDMS process, the data will be forwarded to them if they are allowed to see the data.

An LDMS Daemon (ldmsd) has to be configured with prdcr_subscribe commands in order to receive message data from its producers (prdcr). prdcr_subscribe can be issued many times, e.g.

# subscribe "s0" message tag on all producers
prdcr_subscribe regex=.* message_tag=s0
# subscribe "s1" message tag on all producers
prdcr_subscribe regex=.* message_tag=s1

The message_tag parameter can also be regular expression, e.g.

# subscribe message tags matching "app.*" or "sys.*" on all producers
prdcr_subscribe regex=.* message_tag=app.*
prdcr_subscribe regex=.* message_tag=sys.*

This is the setup for the following figure:

  • bob_app: an application run by bob. It LDMS-connects to samp.

  • samp: an LDMS daemon (sampler).

    • A plugin in samp has an LDMS Message Client cli that subscribes to all message_tags (regex .*).

    • Another plugin plug0 in samp publishes to s1 message_tag.

  • agg: another LDMS daemon (aggregator). It has an LDMS connection to samp.

    • agg subscribes .* message_tags on samp with the following command:

      • prdcr_subscribe regex=samp message_tag=.*

  • alice_app: an application run by alice that LDMS-conencts to agg.

    • alice_app subscribe for s0

    • alice_app has an LDMS Message Client cli that subscribes to "my" message_tag.

The --> arrows illustrate possible message data paths.

                  ┌──────────────┐         ┌────────┐
┌───────────┐     │     samp     │         │  agg   │
│bob_app    │     ├──────────────┤         ├────────┤
├───────────┤     │   .----.     │         │ .----. │
│           │  .----->|ldms|---------------->|ldms| │
│publish(s0)│  |  │   '----'<---.│         │ '----' │
│  |        │  |  │     |       |│         └────|───┘
│  v        │  |  │.----'       |│      .-------'
│.----.     │  |  │| .------.   |│      | ┌────────────┐
│|ldms|--------'  │| |cli:.*|   |│      | │ alice_app  │
│'----'     │     │| |------|   |│      | ├────────────┤
└───────────┘     │'>|cb_fn |   |│      | │   .----.   │
                  │  '------'   |│      '---->|ldms|--.│
                  │             |│        │   '----'  |│
                  │             |│        │           |│
                  │.-----------.|│        │           |│
                  │|  plug0    ||│        │  .------. |│
                  │|-----------||│        │  |cli:s0| |│
                  │|publish(s1)|'│        │  |------| |│
                  │'-----------' │        │  |cb_fn |<'│
                  └──────────────┘        │  '------'  │
                                          └────────────┘

bob_app publishes a message by calling ldms_msg_publish() function. Let’s assume that bob_app publishes s0 message over the LDMS transport to samp with 0400 permission.

When s0 message from bob_app arrives samp daemon, the logic in ldms library does the following:

  1. Credential check: ldms library checks the credential in the message against the credential in the transport. If they are not the same, the message is dropped to prevent user impersonation. The exception is that root can impersonate any user so that ldmsd’s can propagate user messages as user.

  2. Client iteration: ldms library Goes through all clients that subscribe to s0 tag (including the macthing clients that subscribe with regular expression).

  3. Authorization check: Then, ldms library checks if the clients should be seeing the data with the credential information in the client, the credential and permission information in the message.

  4. Callbak: clients’ cb_fn() is called for the authorized clients. Examples of information availble in the msg callback event are message tag name, message data, original publisher’s uid, gid and address. Currently, a user can publish data to any tag. It is up to the receiver side to decide what to do.

In this particular case, we will have 2 clients on samp: the cli that subscribes for all tags (regex .*), and a hidden client for remote subscription (remote client for short) created when samp received a subscription request message from agg (by prdcr_subscribe command in agg). The cb_fn() of the remote client is an internal function in LDMS library that forwards the message to the subscribing peer. Note that the credential of the remote client is the credential from the LDMS transport authentication.

Now, s0 message has reached agg, which has only one remote client: alice_app subscribing to s0 tag. The ldms logic in agg will NOT forward this particular message to alice_app because bob_app the original publisher set 0400 permission.

If bob_app published another message on s0 tag to samp with 0444 permission, when it reached agg, it will be forwarded it to alice_app. cb_fn() on alice_app will be called once the s0 data reached it.

On another path, let’s consider publish(s1) in plug0 plugin in samp process. When plug0 publishes s1 with NULL transport (publishing locally), the ldms library in samp process does the same thing as if the data were received from a remote peer. The cli client in another plugin that subscribed for all tags will get the data (via cb_fn()), and the remote client to agg will also get the data if authorized.

CREDENTIALS AND PERMISSIONS

The ldms_msg_publish() function in C and the msg_publish() method in Python both receive credential cred and permission perm. If cred is not set, the process’ UID/GID are used. If a non-root user tries to impersonate anotehr user, the ldms library on the receiver side will drop the message. We allow root to impersonate other UID/GID so that users’ message can be preserved when propagated. Before forwarding the message to the remote client, the remote client credential is checked if it is allowed to see the data from cred with perm.

CODE EXAMPLES

C publish example

#include "ldms.h"

int main(int argc, char **argv)
{
    ldms_t x;
    int rc;
    x = ldms_xprt_new_with_auth("sock", "munge", NULL);
    /* synchronous connect for simplicity */
    rc = ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL);
    if (rc)
        return rc;

    /* publish to peer */
    rc = ldms_msg_publish(x, "s0", LDMS_MSG_STRING, NULL,
                             0400, "data", 5);

    /* publish to our process */
    rc = ldms_msg_publish(NULL, "json_tag", LDMS_MSG_JSON, NULL,
                             0400, "{\"attr\":\"value\"}", 17);
    return rc;
}

C subscribe example

#include <stdio.h>
#include <unistd.h>
#include "ldms.h"

int cb_fn0(ldms_msg_event_t ev, void *cb_arg);
int success_cb(ldms_msg_event_t ev, void *cb_arg);

int main(int argc, char **argv)
{
    int rc;
    ldms_t x;
    ldms_msg_client_t cli0;
    char *cli0_ctxt;

    /* connect to an ldmsd */
    x = ldms_xprt_new_with_auth("sock", "munge", NULL);
    ldms_xprt_connect_by_name(x, "node1", "411", NULL, NULL);

    cli0_ctxt = malloc(1024); /* some application context for cli0 */
    printf("cli0_ctxt: %p\n", cli0_ctxt);

    /* subscribe "s0" messages that reached us; cb_fn0 is the callback function */
    cli0 = ldms_msg_subscribe("s0", 0, cb_fn0, cli0_ctxt, "s0 only");

    /* Ask ldmsd to forward "s0" messages to us;
     * There will be NO success report callback since the function is `NULL`. */
    rc = ldms_msg_remote_subscribe(x, "s0", 0, NULL, NULL, LDMS_UNLIMITED);
    if (rc)
        return rc;
    /* The non-zero `rc` is a synchronous error that can still be returned,
     * e.g. EIO, ENOMEM, ENAMETOOLONG. */

    /* ask ldmsd to forward messages with tags matching "app.*" regex to
     * us.  `success_cb()` will be called once we know the result of the
     * subscription. */
    rc = ldms_msg_remote_subscribe(x, "app.*", 1, success_cb, NULL, LDMS_UNLIMITED);
    if (rc)
        return rc;

    sleep(10); /* sleep 10 sec */

    /* Request an unsubscription to "s0" tag. Note that the `match` must
     * match the subscription request. The `success_cb` is called to report
       the success/failed result of the unsubscription request. */
    rc = ldms_msg_remote_unsubscribe(x, "s0", 0, success_cb, NULL);
    if (rc)
        return rc;

    /* Request an unsubscription to "app.*" tags. Note that the `match`
     * must match the subscription request. The `success_cb` is called to
     * report the success/failed result of the unsubscription request. */
    rc = ldms_msg_remote_unsubscribe(x, "app.*", 1, success_cb, NULL);
    if (rc)
        return rc;

    ldms_msg_client_close(cli0);

    sleep(5); /* wait a bit so that we can see the events */

    return 0;
}

int cb_fn0(ldms_msg_event_t ev, void *cb_arg)
{
    if (ev->type == LDMS_MSG_EVENT_CLIENT_CLOSE) {
        /*
         * The client is "closed". We can clean up resources
         * associated with it here. No more event will occur
         * on this client.
         */
        ldms_msg_client_t cli = ev->close.client;
        printf("client closed:\n");
        printf(" - match: %s\n", ldms_msg_client_match_get(cli));
        printf(" - is_regex: %d\n", ldms_msg_client_is_regex_get(cli));
        printf(" - desc: %s\n", ldms_msg_client_desc_get(cli));
        printf("freeing cli0_ctxt: %p\n", cb_arg);
        free(cb_arg);
        return 0;
    }
    assert(ev->type == LDMS_MSG_EVENT_RECV);
    /* we expect RECV event or CLOSE event only */
    if (ev->recv.type == LDMS_MSG_STRING) {
        printf("msg_tag: %s\n", ev->recv.msg_tag);
        printf("message data: %s\n", ev->recv.data);
    }
    if (ev->recv.type == LDMS_MSG_JSON) {
        /* process `ev->recv.json` */
    }
    return 0;
}

int success_cb(ldms_msg_event_t ev, void *cb_arg)
{
    switch (ev->type) {
    case LDMS_MSG_EVENT_SUBSCRIBE_STATUS:
        printf("'%s' subscription status: %d\n", ev->status.match,
                                                        ev->status.status);
        break;
    case LDMS_MSG_EVENT_UNSUBSCRIBE_STATUS:
        printf("'%s' unsubscription status: %d\n", ev->status.match,
                                                          ev->status.status);
        break;
    default:
        printf("Unexpected event: %d\n", ev->type);
    }
    return 0;
}

Python publish examples

from ovis_ldms import ldms
x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge
x.connect(host="localhost", port=411)

# Explicitly specify STRING type.
x.msg_publish(msg_tag="s0", data="somedata", msg_type=ldms.LDMS_MSG_STRING,
              perm=0o400)

# JSON; the `dict` data will be converted to JSON
x.msg_publish(msg_tag="s0", data={"attr": "value"}, msg_type=ldms.LDMS_MSG_JSON,
              perm=0o400)

# Assumed STRING type if data is `str` or `bytes` when `msg_type` is omitted
x.msg_publish(msg_tag="s0", data="somedata", perm=0o400)

# Assumed JSON type if data is `dict` when `msg_type` is omitted
x.msg_publish(msg_tag="app0", data={"attr": "value"}, perm=0o400)

# We can publish to our process too
ldms.msg_publish(msg_tag="s0", data="data")

Python subscribe examples

#!/usr/bin/python3

import time
from ovis_ldms import ldms

x = ldms.Xprt(name="sock", auth="munge") # LDMS socket transport /w munge
x.connect(host="node0", port=411)

def msg_recv_cb(cli, sd, cb_arg):
    print(f"[{sd.msg_tag}]: {sd.data}")

def msg_sub_status_cb(ev, cb_arg):
    print(f"'{ev.match}' subscription status: {ev.status}")

def msg_unsub_status_cb(ev, cb_arg):
    print(f"'{ev.match}' unsubscription status: {ev.status}")

# Subscribe to messages that reaches our process on "s0" tag.
# `msg_recv_cb()` will be called when "s0" message reached our process.
cli0 = ldms.MsgClient(match="s0", is_regex=False, cb=msg_recv_cb, cb_arg=None)

# Subscribe to messages that reaches our process on tags matching "app.*".
# Since no `cb` is given, "app.*" data that reaches our process will be
# stored in cli1.
cli1 = ldms.MsgClient(match="app.*", is_regex=True)

# Request peer for message forwarding to us only on tag "s0".
# The status result of the subscription will be notified via
#  `msg_sub_status_cb`.
x.msg_subscribe("s0", is_regex=False, cb=msg_sub_status_cb, cb_arg=None)

# Request peer for message forwarding to us on tags matching "app.*".
# Since no `cb` is given, this call becomes blocking, waiting for the status
# event. An exception is raised if the subscription request resulted in an
# error.
x.msg_subscribe("app.*", is_regex=True)
print(f"app.* subscription status: 0") # b/c an exception is raised if failed

time.sleep(10) # wait a bit to get events

# "s0" messages were handled by `msg_recv_cb`.

# Data of "app.*" messges are stored in `cli1` since no `cb` was given.
sd = cli1.get_data()
while sd is not None:
    print(f"[{sd.msg_tag}]: {sd.data}")
    sd = cli1.get_data()

# Request "s0" subscription cancellation to peer; notify result via `cb`
x.msg_unsubscribe("s0", is_regex=False, cb=msg_unsub_status_cb, cb_arg=None)

# Request "app.*" subscription cancellation to peer; block-wait for result,
# raising an exception if failed.
x.msg_unsubscribe("app.*", is_regex=True)
print(f"app.* unsubscription status: 0") # b/c an exception is raised if failed

# Terminate message clients and the connection
cli0.close()
cli1.close()
x.close()

THREADING

This section describes the threading model of the LDMS Message Bus API and what is safe to call from each context.

Thread Safety of the API

ldms_msg_subscribe(), ldms_msg_client_close(), and ldms_msg_publish() are all thread-safe. They use internal locks (__msg_rwlock and per-tag rwlocks) that are independent of any application or ldmsd thread. No caller-held lock is acquired by these functions.

Callback Invocation Thread

The thread that invokes cb_fn() depends on the origin of the message:

  • For messages published within the same process (ldms_msg_publish() with x=NULL), the callback is invoked by whichever thread called ldms_msg_publish() — inline, before ldms_msg_publish() returns.

  • For messages arriving from a remote peer, the callback is invoked by the IO thread handling the incoming transport data.

No LDMS Message Bus locks are held when cb_fn() is invoked. This means all ldms_msg_* functions are safe to call from within the callback.

What Is Safe to Call from the Callback

Because no Message Bus locks are held during callback invocation, the following are safe to call from within cb_fn():

  • ldms_msg_publish() — to forward or re-publish a message

  • ldms_msg_subscribe() — to register a new subscription

  • ldms_msg_client_close() — to close the current or another subscription

Lock discipline when publishing from the callback: ldms_msg_publish(NULL, ...) invokes subscriber callbacks inline before returning. If the caller holds an application-level lock when calling ldms_msg_publish() from within a callback, and any subscriber of the published tag also tries to acquire the same lock, a deadlock results. Avoid holding application-level locks across a ldms_msg_publish() call inside a callback.

Subscription Teardown and the CLIENT_CLOSE Event

ldms_msg_client_close() begins teardown but returns before it is complete. After it returns:

  • No new LDMS_MSG_EVENT_RECV callbacks will be dispatched for this client.

  • The LDMS_MSG_EVENT_CLIENT_CLOSE event will be delivered asynchronously by an internal thread (ldms_strm_cls). The application must not free resources used by the callback until this event arrives.

  • The client handle must not be used after ldms_msg_client_close() is called.

LDMS_MSG_EVENT_CLIENT_CLOSE is the last event guaranteed to be delivered to the client. It is safe to free all resources associated with the subscription at this point.

A typical teardown pattern using a condition variable:

static int cb_fn(ldms_msg_event_t ev, void *cb_arg)
{
    struct my_ctxt_s *ctxt = cb_arg;
    switch (ev->type) {
    case LDMS_MSG_EVENT_RECV:
        /* process message */
        break;
    case LDMS_MSG_EVENT_CLIENT_CLOSE:
        pthread_mutex_lock(&ctxt->lock);
        ctxt->client = NULL;
        pthread_cond_signal(&ctxt->close_cond);
        pthread_mutex_unlock(&ctxt->lock);
        /* Free resources associated with this subscription here */
        break;
    }
    return 0;
}

static void close_client(struct my_ctxt_s *ctxt)
{
    if (!ctxt->client)
        return;
    ldms_msg_client_close(ctxt->client);
    pthread_mutex_lock(&ctxt->lock);
    while (ctxt->client != NULL)
        pthread_cond_wait(&ctxt->close_cond, &ctxt->lock);
    pthread_mutex_unlock(&ctxt->lock);
    /* Safe: no more callbacks will be invoked */
}

SEE ALSO

ldmsd_controller(8) ldms_msg_chan(7)