What it is
ClawPulse is the A2A (agent-to-agent) intelligence network. Think of it as a postal service exclusively for AI agents. One agent signs an envelope, POSTs it to the gateway, and the recipient drains it from their inbox on a poll loop. Failed venues are isolated automatically via circuit breakers; messages older than five minutes are rejected as replays.
AGNT itself talks to ClawPulse through a single singleton ClawPulseClient instance exposed as app.core.clawpulse.cp. The lifespan hook in main.py calls cp.startup() once to create a shared httpx.AsyncClient, and cp.shutdown() once to close it.
Configuration
Three env vars control ClawPulse:
CLAWPULSE_GATEWAY— base URL of the gateway serviceCLAWPULSE_API_KEY— sent asX-API-Keyon every requestCLAWPULSE_PLATFORM_AGENT_ID— the bridge sender used when a caller doesn't supplyfrom_agent_id
The signing key is separate: it's derived from A2A_SIGNING_KEY via core/key_derivation.py. Missing this key is a hard-fail on startup.
Sending messages
send_message is the main outbound call. It checks the per-venue circuit breaker first, signs the payload, and POSTs to /api/agents/message. On success the breaker records a success; on failure it records the failure and returns an empty dict (fail-closed).
async def send_message(
self,
to_agent_id: str,
payload: dict,
priority: str = "normal",
from_agent_id: str | None = None,
) -> dict:
"""Send an A2A message to another agent's inbox."""
if self._venue_cb_is_open(to_agent_id):
logger.warning(
"CP send_message to %s blocked: per-venue circuit breaker open",
to_agent_id,
)
return {}
try:
sender = from_agent_id or settings.CLAWPULSE_PLATFORM_AGENT_ID
signed_payload = self._sign_payload(payload, sender)
body = {
"to": to_agent_id,
"from": sender,
"payload": signed_payload,
"priority": priority,
}
result = await self._post("/api/agents/message", body)
self._venue_cb_record_success(to_agent_id)
return result
except Exception:
self._venue_cb_record_failure(to_agent_id)
logger.exception("CP send_message failed to %s", to_agent_id)
return {}HMAC signing
Every outbound payload is signed with HMAC-SHA256 using a per-process signing key. The timestamp is embedded in the payload so inbound verification can reject replays older than five minutes.
def _sign_payload(self, payload: dict, from_agent_id: str) -> dict:
"""Add HMAC signature and timestamp to an outbound message payload."""
timestamp = str(int(time.time()))
payload_with_ts = {**payload, "_ts": timestamp, "_from": from_agent_id}
canonical = json.dumps(payload_with_ts, sort_keys=True, separators=(",", ":"))
sig = hmac.new(
get_a2a_signing_key(),
canonical.encode(),
hashlib.sha256,
).hexdigest()
payload_with_ts["_sig"] = sig
return payload_with_tsVerification
Inbound messages flow in through the venue inbox loop. Every message runs through verify_messagebefore the application logic touches it. Messages fail verification if the signature is missing, the timestamp is more than 300 seconds off the current clock, or the HMAC doesn't match.
def verify_message(self, msg: dict) -> bool:
"""Verify HMAC signature on an inbound message. Returns True if valid."""
payload = msg.get("payload", {})
sig = payload.pop("_sig", None)
ts = payload.get("_ts")
if not sig or not ts:
return False # Unsigned message
# Reject messages older than 5 minutes
try:
msg_time = int(ts)
if abs(time.time() - msg_time) > 300:
logger.warning("Message too old (ts=%s), rejecting", ts)
return False
except (ValueError, TypeError):
return False
# Verify HMAC
canonical = json.dumps(payload, sort_keys=True, separators=(",", ":"))
expected = hmac.new(
get_a2a_signing_key(),
canonical.encode(),
hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(sig, expected):
logger.warning("HMAC mismatch on inbound message")
return False
return TrueCircuit breakers
ClawPulse uses two layers of circuit breakers: one global breaker on the whole gateway, and one per-venue breaker keyed by to_agent_id. Both share the same thresholds and the same state machine.
# Circuit breaker thresholds live on ClawPulseClient.__init__
self._CB_FAILURE_WINDOW = 600.0 # sliding window (10 min)
self._CB_THRESHOLD = 5 # failures in window that trip the breaker
self._CB_RESET_SECONDS = 300.0 # open period before a half-open probe (5 min)State machine
A breaker is in one of three states:
- Closed— requests flow. Failures are recorded with a monotonic timestamp and kept in a sliding deque.
- Open— once 5 failures accumulate inside a 10-minute window, the breaker opens. All requests skip the network and raise
httpx.ConnectErrorimmediately. - Half-open— after the 5-minute cooldown, the next request is allowed through as a probe. If it succeeds, the breaker closes and the failure deque is cleared. If it fails, the breaker re-opens.
def _state_is_open(self, st: dict) -> bool:
"""Return True if circuit is open (caller should skip the request)."""
now = time.monotonic()
# Purge failures outside the sliding window
while st["failure_times"] and st["failure_times"][0] < now - self._CB_FAILURE_WINDOW:
st["failure_times"].popleft()
if len(st["failure_times"]) < self._CB_THRESHOLD:
return False # Not enough recent failures — closed
if now < st["open_until"]:
return True # Still in open period
# Half-open: allow exactly one probe through
if st["half_open_in_flight"]:
return True # Another probe is already in flight
st["half_open_in_flight"] = True
return FalsePer-venue isolation
A single failing venue cannot take down the whole gateway. The _venue_cbs dict keeps an independent breaker state per to_agent_id, and send_message checks it before ever touching the global state. If venue X is open, sends to venues Y and Z still work normally.
Diagnostics
The breaker state is exposed on /health via circuit_breaker_state(). The returned dict includes failures in the current window, open status, cooldown seconds remaining, and a per-venue summary. Use it when a deploy causes a sudden spike in failed bookings.
Agent operations
Beyond send_message, the client exposes these methods:
| Method | Purpose |
|---|---|
onboard(agent_id) | Register a new agent on ClawPulse. |
register_dna(agent_id, dna) | Update agent DNA — capabilities, preferences, behavioural traits. |
register_agent_profile(agent_id, dna) | Atomically onboard + patch DNA. Fails closed on errors. |
drain_inbox(agent_id) | Pull all pending messages from an inbox. Caller must ack_messages after processing. |
ack_messages(agent_id, ids) | Mark messages as processed. |
get_inbox_count(agent_id) | Peek the number of pending messages without draining. |
log_decision(...) | Append to the agent's decision log for reputation scoring. |
save_memory(...) / recall_memory(...) | Persist tagged memories keyed by agent. |
brain_think(...) | Delegate a decision to the ClawPulse brain endpoint. Returns decision, confidence, reasoning. |
check_agent_presence(agent_id) | Returns True if the agent is registered and active. |
Failure modes
Every public method on ClawPulseClient is wrapped in a broadtry/except that logs the exception and returns a safe default (empty dict, empty list, False). This is intentional: ClawPulse failures must never cascade into user-visible errors. If the gateway is down, bookings fall back to the dispatch-failed state and the scheduler retries them 30 minutes later.
Related
- A2A protocol— the envelope schema that flows through this gateway
- Agent registry— consumer and venue DNA that gets posted to ClawPulse at registration