Skip to content
AGNT

Backend · Core modules

ClawPulse gateway.

The ClawPulseClientclass — AGNT's async client for the ClawPulse A2A messaging network. It signs envelopes, checks circuit breakers, and delivers messages to other agents' inboxes. Every number on this page is taken directly from agnt-backend/app/core/clawpulse.py.

What it is

ClawPulse is the A2A (agent-to-agent) intelligence network. Think of it as a postal service exclusively for AI agents. One agent signs an envelope, POSTs it to the gateway, and the recipient drains it from their inbox on a poll loop. Failed venues are isolated automatically via circuit breakers; messages older than five minutes are rejected as replays.

AGNT itself talks to ClawPulse through a single singleton ClawPulseClient instance exposed as app.core.clawpulse.cp. The lifespan hook in main.py calls cp.startup() once to create a shared httpx.AsyncClient, and cp.shutdown() once to close it.

Configuration

Three env vars control ClawPulse:

  • CLAWPULSE_GATEWAY— base URL of the gateway service
  • CLAWPULSE_API_KEY — sent as X-API-Key on every request
  • CLAWPULSE_PLATFORM_AGENT_ID— the bridge sender used when a caller doesn't supply from_agent_id

The signing key is separate: it's derived from A2A_SIGNING_KEY via core/key_derivation.py. Missing this key is a hard-fail on startup.

Sending messages

send_message is the main outbound call. It checks the per-venue circuit breaker first, signs the payload, and POSTs to /api/agents/message. On success the breaker records a success; on failure it records the failure and returns an empty dict (fail-closed).

pythonapp/core/clawpulse.py — send_message
async def send_message(
    self,
    to_agent_id: str,
    payload: dict,
    priority: str = "normal",
    from_agent_id: str | None = None,
) -> dict:
    """Send an A2A message to another agent's inbox."""
    if self._venue_cb_is_open(to_agent_id):
        logger.warning(
            "CP send_message to %s blocked: per-venue circuit breaker open",
            to_agent_id,
        )
        return {}
    try:
        sender = from_agent_id or settings.CLAWPULSE_PLATFORM_AGENT_ID
        signed_payload = self._sign_payload(payload, sender)
        body = {
            "to": to_agent_id,
            "from": sender,
            "payload": signed_payload,
            "priority": priority,
        }
        result = await self._post("/api/agents/message", body)
        self._venue_cb_record_success(to_agent_id)
        return result
    except Exception:
        self._venue_cb_record_failure(to_agent_id)
        logger.exception("CP send_message failed to %s", to_agent_id)
        return {}

HMAC signing

Every outbound payload is signed with HMAC-SHA256 using a per-process signing key. The timestamp is embedded in the payload so inbound verification can reject replays older than five minutes.

pythonapp/core/clawpulse.py — _sign_payload
def _sign_payload(self, payload: dict, from_agent_id: str) -> dict:
    """Add HMAC signature and timestamp to an outbound message payload."""
    timestamp = str(int(time.time()))
    payload_with_ts = {**payload, "_ts": timestamp, "_from": from_agent_id}
    canonical = json.dumps(payload_with_ts, sort_keys=True, separators=(",", ":"))
    sig = hmac.new(
        get_a2a_signing_key(),
        canonical.encode(),
        hashlib.sha256,
    ).hexdigest()
    payload_with_ts["_sig"] = sig
    return payload_with_ts

Verification

Inbound messages flow in through the venue inbox loop. Every message runs through verify_messagebefore the application logic touches it. Messages fail verification if the signature is missing, the timestamp is more than 300 seconds off the current clock, or the HMAC doesn't match.

pythonapp/core/clawpulse.py — verify_message
def verify_message(self, msg: dict) -> bool:
    """Verify HMAC signature on an inbound message. Returns True if valid."""
    payload = msg.get("payload", {})
    sig = payload.pop("_sig", None)
    ts = payload.get("_ts")

    if not sig or not ts:
        return False  # Unsigned message

    # Reject messages older than 5 minutes
    try:
        msg_time = int(ts)
        if abs(time.time() - msg_time) > 300:
            logger.warning("Message too old (ts=%s), rejecting", ts)
            return False
    except (ValueError, TypeError):
        return False

    # Verify HMAC
    canonical = json.dumps(payload, sort_keys=True, separators=(",", ":"))
    expected = hmac.new(
        get_a2a_signing_key(),
        canonical.encode(),
        hashlib.sha256,
    ).hexdigest()

    if not hmac.compare_digest(sig, expected):
        logger.warning("HMAC mismatch on inbound message")
        return False

    return True

Circuit breakers

ClawPulse uses two layers of circuit breakers: one global breaker on the whole gateway, and one per-venue breaker keyed by to_agent_id. Both share the same thresholds and the same state machine.

pythonapp/core/clawpulse.py — __init__ constants
# Circuit breaker thresholds live on ClawPulseClient.__init__
self._CB_FAILURE_WINDOW = 600.0   # sliding window (10 min)
self._CB_THRESHOLD = 5            # failures in window that trip the breaker
self._CB_RESET_SECONDS = 300.0    # open period before a half-open probe (5 min)

State machine

A breaker is in one of three states:

  1. Closed— requests flow. Failures are recorded with a monotonic timestamp and kept in a sliding deque.
  2. Open— once 5 failures accumulate inside a 10-minute window, the breaker opens. All requests skip the network and raise httpx.ConnectError immediately.
  3. Half-open— after the 5-minute cooldown, the next request is allowed through as a probe. If it succeeds, the breaker closes and the failure deque is cleared. If it fails, the breaker re-opens.
pythonapp/core/clawpulse.py — _state_is_open
def _state_is_open(self, st: dict) -> bool:
    """Return True if circuit is open (caller should skip the request)."""
    now = time.monotonic()
    # Purge failures outside the sliding window
    while st["failure_times"] and st["failure_times"][0] < now - self._CB_FAILURE_WINDOW:
        st["failure_times"].popleft()

    if len(st["failure_times"]) < self._CB_THRESHOLD:
        return False  # Not enough recent failures — closed

    if now < st["open_until"]:
        return True  # Still in open period

    # Half-open: allow exactly one probe through
    if st["half_open_in_flight"]:
        return True  # Another probe is already in flight
    st["half_open_in_flight"] = True
    return False

Per-venue isolation

A single failing venue cannot take down the whole gateway. The _venue_cbs dict keeps an independent breaker state per to_agent_id, and send_message checks it before ever touching the global state. If venue X is open, sends to venues Y and Z still work normally.

Diagnostics

The breaker state is exposed on /health via circuit_breaker_state(). The returned dict includes failures in the current window, open status, cooldown seconds remaining, and a per-venue summary. Use it when a deploy causes a sudden spike in failed bookings.

Agent operations

Beyond send_message, the client exposes these methods:

MethodPurpose
onboard(agent_id)Register a new agent on ClawPulse.
register_dna(agent_id, dna)Update agent DNA — capabilities, preferences, behavioural traits.
register_agent_profile(agent_id, dna)Atomically onboard + patch DNA. Fails closed on errors.
drain_inbox(agent_id)Pull all pending messages from an inbox. Caller must ack_messages after processing.
ack_messages(agent_id, ids)Mark messages as processed.
get_inbox_count(agent_id)Peek the number of pending messages without draining.
log_decision(...)Append to the agent's decision log for reputation scoring.
save_memory(...) / recall_memory(...)Persist tagged memories keyed by agent.
brain_think(...)Delegate a decision to the ClawPulse brain endpoint. Returns decision, confidence, reasoning.
check_agent_presence(agent_id)Returns True if the agent is registered and active.

Failure modes

Every public method on ClawPulseClient is wrapped in a broadtry/except that logs the exception and returns a safe default (empty dict, empty list, False). This is intentional: ClawPulse failures must never cascade into user-visible errors. If the gateway is down, bookings fall back to the dispatch-failed state and the scheduler retries them 30 minutes later.

  • A2A protocol— the envelope schema that flows through this gateway
  • Agent registry— consumer and venue DNA that gets posted to ClawPulse at registration

Related