NATS Transport
Add NATS Core transport to the minimal sender by deriving AGH Network subjects, subscribing to channel traffic, and preserving envelope correlation.
- Audience
- Implementers designing interoperable agents
- Focus
- Guide guidance shaped for scanability, day-two clarity, and operator context.
This tutorial adds a real transport to the minimal sender. You will publish and receive AGH Network envelopes over NATS Core using the v0 subject mapping.
Normative details live in the NATS binding reference and
delivery reference. The current AGH Runtime embeds NATS Core with
agh.network.v0 subjects, so this page starts there.
What you'll build
By the end, you will have a small NATS peer that:
- starts a local NATS server for the exercise
- subscribes to a channel broadcast subject
- subscribes to its own direct subject
- publishes a
saybroadcast envelope - publishes a direct
whoisrequest and receives a response - keeps envelope
reply_tocorrelation even when using NATS request-reply
Map envelopes to subjects
AGH Network v0 uses two subject shapes:
| Intent | Subject |
|---|---|
| Broadcast to a channel | agh.network.v0.<channel>.broadcast |
| Direct to a peer | agh.network.v0.<channel>.peer.<route_token> |
The route token is the first 32 lowercase hex characters of SHA-256(peer_id UTF-8 bytes). The
envelope still uses the canonical peer ID in to; the route token is only for NATS subjects.
Rendering diagram...
Write the NATS peer
This complete program starts an embedded NATS server for the tutorial, then connects two clients: one sender and one echo peer.
package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"time"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)
type Envelope struct {
Protocol string `json:"protocol"`
ID string `json:"id"`
Kind string `json:"kind"`
Channel string `json:"channel"`
From string `json:"from"`
To *string `json:"to"`
InteractionID *string `json:"interaction_id,omitempty"`
ReplyTo *string `json:"reply_to,omitempty"`
TraceID *string `json:"trace_id,omitempty"`
CausationID *string `json:"causation_id,omitempty"`
TS int64 `json:"ts"`
ExpiresAt *int64 `json:"expires_at,omitempty"`
Body map[string]any `json:"body"`
Proof map[string]any `json:"proof"`
Ext map[string]any `json:"ext,omitempty"`
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ns, url := startNATS()
defer ns.Shutdown()
sender, err := nats.Connect(url)
if err != nil {
log.Fatalf("connect sender: %v", err)
}
defer sender.Close()
echo, err := nats.Connect(url)
if err != nil {
log.Fatalf("connect echo: %v", err)
}
defer echo.Close()
channel := "builders"
senderID := "sender.demo"
echoID := "echo.demo"
if err := subscribeEchoPeer(echo, channel, echoID); err != nil {
log.Fatalf("subscribe echo peer: %v", err)
}
broadcast := envelope("msg_say_01", "say", channel, senderID, nil, map[string]any{
"text": "hello over NATS",
"intent": "demo",
})
if err := publish(sender, broadcastSubject(channel), broadcast); err != nil {
log.Fatalf("publish broadcast: %v", err)
}
interactionID := "int_whois_01"
request := envelope("msg_whois_01", "whois", channel, senderID, &echoID, map[string]any{
"type": "request",
"query": echoID,
})
msg, err := requestEnvelope(ctx, sender, directSubject(channel, echoID), request)
if err != nil {
log.Fatalf("request whois: %v", err)
}
var response Envelope
if err := json.Unmarshal(msg.Data, &response); err != nil {
log.Fatalf("decode response: %v", err)
}
fmt.Printf("received %s reply_to=%s from=%s\n", response.Kind, value(response.ReplyTo), response.From)
}
func startNATS() (*server.Server, string) {
ns, err := server.NewServer(&server.Options{
Host: "127.0.0.1",
Port: -1,
})
if err != nil {
log.Fatalf("create nats server: %v", err)
}
ns.Start()
if !ns.ReadyForConnections(5 * time.Second) {
log.Fatal("nats server did not become ready")
}
return ns, ns.ClientURL()
}
func subscribeEchoPeer(conn *nats.Conn, channel string, peerID string) error {
if _, err := conn.Subscribe(broadcastSubject(channel), func(msg *nats.Msg) {
var env Envelope
if err := json.Unmarshal(msg.Data, &env); err == nil {
fmt.Printf("echo saw broadcast %s from=%s\n", env.Kind, env.From)
}
}); err != nil {
return err
}
_, err := conn.Subscribe(directSubject(channel, peerID), func(msg *nats.Msg) {
var req Envelope
if err := json.Unmarshal(msg.Data, &req); err != nil {
return
}
if req.Kind != "whois" || msg.Reply == "" {
return
}
response := envelope("msg_whois_response_01", "whois", req.Channel, peerID, &req.From, map[string]any{
"type": "response",
"peer_card": map[string]any{
"peer_id": peerID,
"display_name": "Echo Demo",
"profiles_supported": []string{"agh-network/v0"},
"capabilities": []string{"echo"},
"artifacts_supported": []string{},
"trust_modes_supported": []string{},
},
})
response.ReplyTo = &req.ID
_ = publish(conn, msg.Reply, response)
})
if err != nil {
return err
}
return conn.Flush()
}
func envelope(id, kind, channel, from string, to *string, body map[string]any) Envelope {
return Envelope{
Protocol: "agh-network/v0",
ID: id,
Kind: kind,
Channel: channel,
From: from,
To: to,
TS: time.Now().UTC().Unix(),
Body: body,
Proof: nil,
}
}
func publish(conn *nats.Conn, subject string, env Envelope) error {
payload, err := json.Marshal(env)
if err != nil {
return fmt.Errorf("marshal envelope: %w", err)
}
if err := conn.Publish(subject, payload); err != nil {
return fmt.Errorf("publish %s: %w", subject, err)
}
return conn.Flush()
}
func requestEnvelope(ctx context.Context, conn *nats.Conn, subject string, env Envelope) (*nats.Msg, error) {
payload, err := json.Marshal(env)
if err != nil {
return nil, fmt.Errorf("marshal request envelope: %w", err)
}
return conn.RequestWithContext(ctx, subject, payload)
}
func broadcastSubject(channel string) string {
return "agh.network.v0." + channel + ".broadcast"
}
func directSubject(channel string, peerID string) string {
return "agh.network.v0." + channel + ".peer." + routeToken(peerID)
}
func routeToken(peerID string) string {
sum := sha256.Sum256([]byte(peerID))
return hex.EncodeToString(sum[:16])
}
func value(ptr *string) string {
if ptr == nil {
return ""
}
return *ptr
}Language-agnostic pseudocode:
start local nats server
connect sender client
connect echo client
channel = "builders"
sender_id = "sender.demo"
echo_id = "echo.demo"
echo subscribes to:
"agh.network.v0.builders.broadcast"
"agh.network.v0.builders.peer." + route_token(echo_id)
sender publishes say envelope to:
"agh.network.v0.builders.broadcast"
sender sends whois request envelope to:
"agh.network.v0.builders.peer." + route_token(echo_id)
echo receives whois request:
build whois response envelope
set response.reply_to = request.id
set response.interaction_id = request.interaction_id
publish response to the NATS reply subject
sender decodes response
assert response.reply_to == request.idKeep NATS replies secondary
The example uses NATS request-reply so the tutorial can run in one process. Do not treat the NATS reply subject as the protocol correlation model. AGH Network still relies on:
| Envelope field | Role |
|---|---|
id | Unique message identity. |
reply_to | Message being answered. |
interaction_id | Logical interaction thread. |
trace_id | Larger operational flow, when one exists. |
For v1 verified peers, the route token changes to the verified fingerprint and the prefix becomes
agh.network.v1; see the NATS binding reference. The core envelope correlation
fields stay authoritative.
Verify it works
Run the program:
go run ./nats-transport.goExpected output:
echo saw broadcast say from=sender.demo
received whois reply_to=msg_whois_01 from=echo.demoYou now have a transport-level peer. The next step is to add trust verification.
Minimal Sender
Build the smallest AGH Network sender by constructing one valid envelope, serializing it to JSON, and writing it to stdout or a file.
Trust Verification
Add the AGH Network v1 Ed25519 plus JCS trust profile by signing envelopes, verifying incoming proofs, and separating cryptographic identity from local policy.