LDMS Python API Threading Model and Callback Contract

Section 1: The Threading Model

LDMS maintains a pool of I/O threads internally. These threads handle all network communication — sending, receiving, and delivering events. When you supply a callback to an LDMS Python API, that callback is called from one of these I/O threads, not from your application thread.

This means two things:

Your callback runs concurrently with your application. If your callback reads or writes data that your application thread also accesses, you need to protect that data with a lock or use a thread-safe data structure.

Your callback runs on a thread that LDMS needs to keep available. If your callback takes a long time — waiting on I/O, sleeping, or calling back into LDMS in a way that waits for a response — you are tying up an I/O thread that LDMS needs to deliver further events. In the worst case this leads to a deadlock. See Section 2: Two Rules for Every Callback.

Note

For developers familiar with the C API: the Python callbacks are called from the same zap I/O threads that drive the C-level event handlers. Python’s GIL is acquired before your callback is called and released when it returns, so only one Python callback runs at a time across all I/O threads.

Section 2: Two Rules for Every Callback

These rules apply to every callback in the API without exception.

Rule 1 — Do Not Call Blocking LDMS APIs from a Callback

The following methods wait internally for LDMS to deliver a response event before returning:

Method

What it waits for

Xprt.connect() (no cb)

Connection accepted or rejected

Xprt.accept()

Incoming connection

Xprt.dir() (no cb)

Directory reply

Xprt.lookup() (no cb)

Lookup reply

Xprt.recv()

Incoming message

Set.update() (no cb)

Update complete

Xprt.msg_subscribe() (no cb)

Subscribe status

Xprt.msg_unsubscribe() (no cb)

Unsubscribe status

Calling any of these from inside a callback leads to a deadlock, because the response event they are waiting for can only be delivered by the same I/O thread that is currently running your callback.

Each of these methods has a non-blocking form: pass a cb argument and the method returns immediately. The result is delivered to your callback when it is ready. Use the non-blocking form whenever you need to make further LDMS calls from within a callback.

Rule 2 — Do Not Rely on Exceptions to Signal Errors

If your callback raises an exception, it is printed to stderr and discarded. It does not propagate to your application and does not stop the I/O thread. Handle errors inside the callback directly — for example, by recording the error in a shared variable or signaling your application thread.

The one exception to this rule is the MessageChannel receive callback, which is wrapped in error handling so that exceptions are printed and the I/O thread always continues cleanly. All other callbacks have no such protection.

Section 3: Blocking and Non-Blocking Mode

Most LDMS operations can be used in either blocking or non-blocking mode.

In blocking mode, you call the method with no callback. The call does not return until the operation is complete. This is simpler to write and easier to reason about, but it ties up your application thread while it waits.

# Blocking — waits until the directory result is ready
sets = xprt.dir()

In non-blocking mode, you supply a callback. The method returns immediately and your callback is called from an I/O thread when the result is ready.

# Non-blocking — returns immediately; result delivered to callback
def on_dir(xprt, status, dir_data, arg):
    ...

xprt.dir(cb=on_dir, cb_arg=my_arg)

The sections below describe both modes for each callback, including what objects are returned or delivered and how long they remain valid.

Section 4: Transport API Callbacks

Transport Event Callback

Registered via: Xprt.connect(cb=..., cb_arg=...) or Xprt.listen(cb=..., cb_arg=...)

Transport events cover the full lifecycle of a connection: establishment, data receipt, and disconnection. The same callback signature is used whether you are connecting as a client or listening as a server.

def my_xprt_cb(xprt, ev, arg):
    pass
  • xprt — the Xprt object the event is for

  • ev — an XprtEvent describing what happened. ev.type is one of: EVENT_CONNECTED, EVENT_REJECTED, EVENT_ERROR, EVENT_DISCONNECTED, EVENT_RECV, EVENT_SEND_COMPLETE, EVENT_SEND_QUOTA_DEPOSITED, EVENT_SET_DELETE

  • arg — the cb_arg you supplied

One behavioral difference between client and server: On the server side (registered via listen()), the xprt delivered on EVENT_CONNECTED is the newly accepted connection, not the listening transport. All subsequent events for that connection — including EVENT_RECV and EVENT_DISCONNECTED — also arrive through this same callback with the accepted transport as xprt.

In non-blocking mode: ev and its contents are valid for the lifetime of your callback and safe to retain beyond it. For EVENT_RECV, the received data is available as a Python bytes object in ev and is safe to keep after the callback returns.

In blocking mode: There is no single blocking call for all transport events. Instead, individual blocking methods cover specific events:

  • Xprt.connect() (no cb) blocks until EVENT_CONNECTED, EVENT_REJECTED, or EVENT_ERROR

  • Xprt.accept() blocks until an incoming connection is ready, returning an Xprt object for the new connection

  • Xprt.recv() blocks until EVENT_RECV, returning the received data as bytes

Directory Callback

Registered via: Xprt.dir(cb=..., cb_arg=...)

A directory operation asks a remote peer for the list of sets it publishes. The result may arrive in multiple callbacks if the peer has many sets.

def my_dir_cb(xprt, status, dir_data, arg):
    pass
  • xprt — the Xprt the dir request was issued on

  • status — 0 on success, non-zero on error

  • dir_data — a DirData object whose set_data field is a list of DirSet objects, one per published set. dir_data.more is 1 if additional callbacks will follow for this request

  • arg — the cb_arg you supplied

In non-blocking mode: The DirSet objects in dir_data.set_data are plain Python objects and are safe to retain after the callback returns. You do not need to copy any data before returning.

In blocking mode: Xprt.dir() returns a Python list of DirSet objects once all results have arrived. The list is fully owned by the caller.

Lookup Callback

Registered via: Xprt.lookup(name, flags, cb=..., cb_arg=...)

A lookup operation retrieves a handle to one or more remote sets by name, schema, or regular expression. The result may arrive in multiple callbacks if multiple sets match.

def my_lookup_cb(xprt, status, more, lset, arg):
    pass
  • xprt — the Xprt the lookup was issued on

  • status — 0 on success, non-zero on error

  • more — 1 if more matching sets will follow; 0 if this is the last result

  • lset — a Set object for the matched set, or None on error

  • arg — the cb_arg you supplied

In non-blocking mode: lset is a Python object safe to retain beyond the callback. To read its current metric values you still need to call Set.update() — the lookup only gives you a handle, not the data.

In blocking mode: Xprt.lookup() returns a single Set object, or a list of Set objects if the lookup matched multiple sets. The returned objects are fully owned by the caller.

Update Callback

Registered via: Set.update(cb=..., cb_arg=...)

An update fetches the current metric values of a remote set. Multiple callbacks may follow a single update() call if the UPD_F_MORE flag is set.

def my_update_cb(lset, flags, arg):
    pass
  • lset — the Set whose data was updated

  • flags — bitwise OR of status flags:

    • UPD_F_MORE — more update callbacks will follow for this request

    • UPD_F_PUSH — this update was triggered by a push from the remote peer

    • UPD_F_PUSH_LAST — last push update; the push stream has ended

    • Use LDMS_UPD_ERROR(flags) to extract any error code

  • arg — the cb_arg you supplied

In non-blocking mode: lset’s metric values are current and stable during your callback. Copy any values you intend to use after the callback returns, as a subsequent update may overwrite them.

In blocking mode: Set.update() returns once the update is complete. The Set object’s metric values are current at the point of return and remain stable until the next update() call.

Push Callback

Registered via: Set.subscribe_push(cb=..., cb_arg=...)

Push delivers metric updates initiated by the remote peer rather than by your own update() calls. There is no blocking mode for push — updates are always delivered via callback.

def my_push_cb(lset, flags, arg):
    pass
  • lset — the Set whose data was pushed

  • flags — same meaning as in the update callback. UPD_F_PUSH is always set. UPD_F_PUSH_LAST means the remote peer has ended the push stream

  • arg — the cb_arg you supplied

The same rules about metric value lifetime apply as in the update callback.

Section 5: Message API Callbacks

Message Client Callback

Registered via: MsgClient(match, is_regex, cb=..., cb_arg=...)

A MsgClient subscribes to messages published on the message bus that match a name or pattern. There is no blocking mode — messages are always delivered via callback.

def my_msg_client_cb(client, msg_data, arg):
    pass
  • client — the MsgClient that received the message

  • msg_data — a MsgData object with message content, source address, credentials, and metadata. Safe to retain beyond the callback

  • arg — the cb_arg you supplied

Your callback is only invoked for received messages. The event that signals the client has been closed is handled internally and is not forwarded to your callback.

Message Subscribe Status Callback

Registered via: Xprt.msg_subscribe(match, is_regex, cb=..., cb_arg=...)

This callback confirms whether a remote subscription request succeeded or failed. It is called at most once per msg_subscribe() call.

def my_msg_subscribe_cb(ev, arg):
    pass
  • ev — a MsgStatusEvent with fields match (str), is_regex (int), and status (int; 0 = success)

  • arg — the cb_arg you supplied

In non-blocking mode: ev is a Python object safe to retain beyond the callback.

In blocking mode: Xprt.msg_subscribe() blocks until the status is received and returns normally on success, or raises MsgSubscribeError on failure. No callback object is delivered.

MessageChannel Receive Callback

Registered via: MessageChannel(..., msg_cb_fn=..., msg_cb_arg=...)

A MessageChannel provides a resilient, bidirectional message interface that manages its own transport connections automatically. There is no blocking mode — messages are always delivered via callback.

def my_chan_cb(msg_data, arg):
    pass
  • msg_data — a MsgData object with message content, source address, credentials, and metadata. Safe to retain beyond the callback

  • arg — the msg_cb_arg you supplied

This is the only callback where an exception raised in your code is handled safely — it is printed to stderr and the I/O thread continues. For all other callbacks, follow Rule 2 and handle errors inside the callback directly.

Section 6: Summary

Callback

Registered via

Has blocking mode

Multiple deliveries

Transport event

connect(cb=...) or listen(cb=...)

Yes — connect(), accept(), recv()

Yes — one per event type

Directory

xprt.dir(cb=...)

Yes — returns a list

Yes — until more=0

Lookup

xprt.lookup(cb=...)

Yes — returns a Set or list

Yes — until more=0

Update

set.update(cb=...)

Yes — blocks until complete

Yes — until UPD_F_MORE clear

Push

set.subscribe_push(cb=...)

No

Yes — until UPD_F_PUSH_LAST

Message client

MsgClient(cb=...)

No

Yes — one per message

Msg subscribe status

xprt.msg_subscribe(cb=...)

Yes — blocks until status received

No — one-shot

MessageChannel recv

MessageChannel(msg_cb_fn=...)

No

Yes — one per message

SEE ALSO

ldms_xprt_threading_model(7) ldms_msg(7) ldms_msg_chan(7)