Skip to main content

IAM Service -- Event Catalog

Overview

The IAM service publishes domain events whenever a state change occurs. Events represent facts (something that happened), not intentions. They are published via a transactional outbox: the domain write and the outbox insert happen in a single database transaction, guaranteeing that every committed state change produces exactly one event record.

A background relay publishes pending outbox records to RabbitMQ. Delivery is at-least-once -- the same event may be delivered more than once. Consumers must be idempotent.

Events are only emitted on real state changes. Idempotent no-op retries (e.g., suspending an already-suspended tenant) do not produce events.


Connection Setup

Exchange

PropertyValue
Exchange nameiam.events
Typetopic
Durabletrue
Auto-deletefalse

Queue Declaration

Consumers must declare their own durable queue and bind it to the exchange. Queue names should be unique per consuming service (e.g., billing-iam-consumer, notifications-iam-consumer).

Binding Patterns

The routing key format is {aggregate_type}.{event_type}.

PatternMatches
#All events
tenant.*tenant.created, tenant.suspended, tenant.reactivated
tenant.createdOnly tenant creation events
membership.*membership.created, membership.suspended, membership.reactivated
invitation.*invitation.accepted, invitation.revoked
user.*user.created, user.suspended, user.reactivated
membership.user.role.*membership.user.role.assigned, membership.user.role.unassigned
invitation.user.invitedinvitation.user.invited
realm.*realm.created
role.*role.created
permission.*permission.created

Note: The routing key is constructed as {aggregate_type}.{event_type}. For events where the event type itself contains dots (e.g., user.role.assigned on the membership aggregate), the full routing key becomes membership.user.role.assigned. Plan your binding patterns accordingly.


Message Structure

Each AMQP message is structured as follows:

AMQP Properties

PropertyDescriptionExample
MessageIdUnique event UUID. Use for deduplication.f47ac10b-58cc-4372-a567-0e02b2c3d479
ContentTypeAlways application/json.application/json
TimestampWhen the event occurred.2025-01-15T10:30:00Z

AMQP Headers

HeaderTypeAlways PresentDescription
event_typestringYesThe event type (e.g., tenant.created).
aggregate_typestringYesThe aggregate that owns this event (e.g., tenant).
aggregate_idstringYesUUID of the aggregate instance.
tenant_idstringNoUUID of the tenant scope. Absent for global resources (users, realms, permissions).
occurred_atstringYesRFC3339Nano timestamp.
schema_versionint32YesPayload schema version. Currently 1.

Body

The body is a JSON object containing domain-specific fields. The exact fields depend on the event type and are documented in the catalog below.

Example Raw Message

Properties:
MessageId: "f47ac10b-58cc-4372-a567-0e02b2c3d479"
ContentType: "application/json"
Timestamp: 2025-01-15T10:30:00Z

Headers:
event_type: "tenant.created"
aggregate_type: "tenant"
aggregate_id: "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
tenant_id: "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
occurred_at: "2025-01-15T10:30:00.123456789Z"
schema_version: 1

Body:
{"tenant_id":"a1b2c3d4-...","realm_id":"...","slug":"acme","display_name":"Acme Corp"}

Event Catalog

Realm Events

realm.created

PropertyValue
Event typerealm.created
Routing keyrealm.realm.created
Aggregaterealm
Tenant scopedNo
Fires whenA new realm is created.

Payload fields:

FieldTypeDescription
IDstringUUID of the realm.
KeystringUnique key for the realm.
NamestringDisplay name of the realm.
CreatedAtstringRFC3339 timestamp of creation.

The payload is the JSON-serialized Realm struct. Field names use Go default casing (no JSON tags).


Tenant Events

tenant.created

PropertyValue
Event typetenant.created
Routing keytenant.tenant.created
Aggregatetenant
Tenant scopedYes
Fires whenA new tenant is created.

Payload fields:

FieldTypeDescription
tenant_idstringUUID of the created tenant.
realm_idstringUUID of the realm this tenant belongs to.
slugstringURL-safe tenant identifier.
display_namestringHuman-readable tenant name.

tenant.suspended

PropertyValue
Event typetenant.suspended
Routing keytenant.tenant.suspended
Aggregatetenant
Tenant scopedYes
Fires whenAn active tenant is suspended.

Payload fields:

FieldTypeDescription
tenant_idstringUUID of the suspended tenant.

tenant.reactivated

PropertyValue
Event typetenant.reactivated
Routing keytenant.tenant.reactivated
Aggregatetenant
Tenant scopedYes
Fires whenA suspended tenant is reactivated.

Payload fields:

FieldTypeDescription
tenant_idstringUUID of the reactivated tenant.

User Events

user.created

PropertyValue
Event typeuser.created
Routing keyuser.user.created
Aggregateuser
Tenant scopedNo
Fires whenA new global user is created.

Payload fields:

FieldTypePresenceDescription
user_idstringAlwaysUUID of the created user.
emailstringOptionalEmail address, if provided.
phone_e164stringOptionalPhone number in E.164 format, if provided.
display_namestringOptionalDisplay name, if provided.

user.suspended

PropertyValue
Event typeuser.suspended
Routing keyuser.user.suspended
Aggregateuser
Tenant scopedNo
Fires whenAn active user is suspended.

Payload fields:

FieldTypeDescription
user_idstringUUID of the suspended user.

user.reactivated

PropertyValue
Event typeuser.reactivated
Routing keyuser.user.reactivated
Aggregateuser
Tenant scopedNo
Fires whenA suspended user is reactivated.

Payload fields:

FieldTypeDescription
user_idstringUUID of the reactivated user.

Membership Events

membership.created

PropertyValue
Event typemembership.created
Routing keymembership.membership.created
Aggregatemembership
Tenant scopedYes
Fires whenA user is added to a tenant (directly or via invitation acceptance).

Payload fields:

FieldTypeDescription
membership_idstringUUID of the created membership.
tenant_idstringUUID of the tenant.
user_idstringUUID of the user.

membership.suspended

PropertyValue
Event typemembership.suspended
Routing keymembership.membership.suspended
Aggregatemembership
Tenant scopedYes
Fires whenAn active membership is suspended.

Payload fields:

FieldTypeDescription
membership_idstringUUID of the suspended membership.

membership.reactivated

PropertyValue
Event typemembership.reactivated
Routing keymembership.membership.reactivated
Aggregatemembership
Tenant scopedYes
Fires whenA suspended membership is reactivated.

Payload fields:

FieldTypeDescription
membership_idstringUUID of the reactivated membership.

Role and Permission Events

role.created

PropertyValue
Event typerole.created
Routing keyrole.role.created
Aggregaterole
Tenant scopedYes
Fires whenA new role is created within a tenant.

Payload fields:

FieldTypeDescription
role_idstringUUID of the created role.
tenant_idstringUUID of the tenant that owns this role.
keystringUnique key for the role within the tenant.
namestringHuman-readable role name.

permission.created

PropertyValue
Event typepermission.created
Routing keypermission.permission.created
Aggregatepermission
Tenant scopedNo
Fires whenA new global permission is created.

Payload fields:

FieldTypePresenceDescription
IDstringAlwaysUUID of the permission.
KeystringAlwaysUnique permission key.
DescriptionstringOptionalDescription of the permission (null if not set).
CreatedAtstringAlwaysRFC3339 timestamp of creation.

The payload is the JSON-serialized Permission struct. Field names use Go default casing (no JSON tags).


user.role.assigned

PropertyValue
Event typeuser.role.assigned
Routing keymembership.user.role.assigned
Aggregatemembership
Tenant scopedYes
Fires whenA role is assigned to a membership.

Payload fields:

FieldTypeDescription
assignment_idstringUUID of the role assignment record.
membership_idstringUUID of the membership receiving the role.
role_idstringUUID of the assigned role.

user.role.unassigned

PropertyValue
Event typeuser.role.unassigned
Routing keymembership.user.role.unassigned
Aggregatemembership
Tenant scopedYes
Fires whenA role is removed from a membership.

Payload fields:

FieldTypeDescription
membership_idstringUUID of the membership losing the role.
role_idstringUUID of the unassigned role.

Invitation Events

user.invited

PropertyValue
Event typeuser.invited
Routing keyinvitation.user.invited
Aggregateinvitation
Tenant scopedYes
Fires whenA new invitation is created for a user to join a tenant.

Payload fields:

FieldTypeDescription
invitation_idstringUUID of the invitation.
tenant_idstringUUID of the tenant the user is invited to.
emailstringEmail address of the invitee.
tokenstringRaw invitation token (for email delivery).
expires_atstringRFC3339 timestamp of when the invitation expires.

Security note: The token field contains the raw invitation token. This event should only be consumed by the notification/email service. Do not log or persist this value beyond its intended use.


invitation.accepted

PropertyValue
Event typeinvitation.accepted
Routing keyinvitation.invitation.accepted
Aggregateinvitation
Tenant scopedYes
Fires whenAn invitation is accepted by a user. A membership.created event is also emitted in the same transaction.

Payload fields:

FieldTypeDescription
invitation_idstringUUID of the accepted invitation.
user_idstringUUID of the user who accepted.

invitation.revoked

PropertyValue
Event typeinvitation.revoked
Routing keyinvitation.invitation.revoked
Aggregateinvitation
Tenant scopedYes
Fires whenA pending invitation is revoked.

Payload fields:

FieldTypeDescription
invitation_idstringUUID of the revoked invitation.

API Key Events

api_key.created

PropertyValue
Event typeapi_key.created
Routing keyapi_key.api_key.created
Aggregateapi_key
Tenant scopedNo
Fires whenA new database-backed API key is created.

Payload fields:

FieldTypeDescription
api_key_idstringUUID of the created API key.
namestringHuman-readable name of the key.
key_prefixstringFirst 8 characters of the raw key (for identification).

Note: The raw key is never included in events.


api_key.revoked

PropertyValue
Event typeapi_key.revoked
Routing keyapi_key.api_key.revoked
Aggregateapi_key
Tenant scopedNo
Fires whenAn active API key is revoked. Not emitted if the key was already revoked.

Payload fields:

FieldTypeDescription
api_key_idstringUUID of the revoked API key.
namestringHuman-readable name of the key.

Event Summary Table

#Event TypeRouting KeyAggregateTenant Scoped
1realm.createdrealm.realm.createdrealmNo
2tenant.createdtenant.tenant.createdtenantYes
3tenant.suspendedtenant.tenant.suspendedtenantYes
4tenant.reactivatedtenant.tenant.reactivatedtenantYes
5user.createduser.user.createduserNo
6user.suspendeduser.user.suspendeduserNo
7user.reactivateduser.user.reactivateduserNo
8membership.createdmembership.membership.createdmembershipYes
9membership.suspendedmembership.membership.suspendedmembershipYes
10membership.reactivatedmembership.membership.reactivatedmembershipYes
11role.createdrole.role.createdroleYes
12permission.createdpermission.permission.createdpermissionNo
13user.role.assignedmembership.user.role.assignedmembershipYes
14user.role.unassignedmembership.user.role.unassignedmembershipYes
15user.invitedinvitation.user.invitedinvitationYes
16invitation.acceptedinvitation.invitation.acceptedinvitationYes
17invitation.revokedinvitation.invitation.revokedinvitationYes
18api_key.createdapi_key.api_key.createdapi_keyNo
19api_key.revokedapi_key.api_key.revokedapi_keyNo

Consumer Example

The following Go program connects to RabbitMQ, declares a durable queue, binds to all IAM events, and processes them with manual acknowledgement and deduplication.

package main

import (
"encoding/json"
"log"
"os"
"os/signal"
"sync"
"syscall"

amqp "github.com/rabbitmq/amqp091-go"
)

func headerString(headers amqp.Table, key string) string {
v, ok := headers[key]
if !ok {
return ""
}
s, _ := v.(string)
return s
}

func main() {
rabbitURL := os.Getenv("RABBITMQ_URL")
if rabbitURL == "" {
rabbitURL = "amqp://guest:guest@localhost:5672/"
}

conn, err := amqp.Dial(rabbitURL)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open channel: %v", err)
}
defer ch.Close()

// Declare the exchange (idempotent -- safe to call even if it already exists)
err = ch.ExchangeDeclare(
"iam.events", // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare exchange: %v", err)
}

// Declare a durable queue for this consumer
q, err := ch.QueueDeclare(
"my-service-iam-consumer", // name -- unique per consuming service
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare queue: %v", err)
}

// Bind to all IAM events. Use a more specific pattern to filter:
// "tenant.*" -- all tenant events
// "membership.membership.*" -- membership lifecycle events
// "membership.user.role.*" -- role assignment events
err = ch.QueueBind(
q.Name, // queue name
"#", // routing key pattern
"iam.events", // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to bind queue: %v", err)
}

// Use manual acknowledgement (autoAck: false)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer tag (auto-generated)
false, // autoAck -- set to false for manual ack
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to register consumer: %v", err)
}

log.Printf("Consumer started. Listening on queue '%s'", q.Name)

// Graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
for msg := range msgs {
eventType := headerString(msg.Headers, "event_type")
aggregateID := headerString(msg.Headers, "aggregate_id")
tenantID := headerString(msg.Headers, "tenant_id")

log.Printf("Received event: type=%s aggregate_id=%s tenant_id=%s message_id=%s",
eventType, aggregateID, tenantID, msg.MessageId)

// --- Deduplication ---
// Use msg.MessageId to check if this event was already processed.
// Store processed MessageIds in your database within the same
// transaction as your side effects.

// --- Handle by event type ---
switch eventType {
case "tenant.created":
var payload struct {
TenantID string `json:"tenant_id"`
RealmID string `json:"realm_id"`
Slug string `json:"slug"`
DisplayName string `json:"display_name"`
}
if err := json.Unmarshal(msg.Body, &payload); err != nil {
log.Printf("Failed to unmarshal payload: %v", err)
_ = msg.Nack(false, false) // discard malformed messages
continue
}
log.Printf("Tenant created: %s (%s)", payload.DisplayName, payload.Slug)

// Handle other event types ...

default:
log.Printf("Unhandled event type: %s", eventType)
}

// Acknowledge after successful processing
if err := msg.Ack(false); err != nil {
log.Printf("Failed to ack message: %v", err)
}
}
}()

<-sigChan
log.Println("Shutting down consumer...")

// Cancel the consumer so the msgs channel closes
_ = ch.Cancel("", false)
wg.Wait()

log.Println("Consumer stopped.")
}

Consumer Best Practices

Idempotency

Delivery is at-least-once. The same event will be delivered more than once during retries, rebalances, or publisher relay restarts.

  • Use the MessageId AMQP property (a UUID) to deduplicate.
  • Store processed MessageId values in your database within the same transaction as your side effects.
  • A simple approach: maintain a processed_events table with a unique constraint on message_id, and INSERT ... ON CONFLICT DO NOTHING before processing.
CREATE TABLE processed_events (
message_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

Error Handling

  • Transient errors (database timeouts, network blips): Nack with requeue: true. The message will be redelivered.
  • Permanent errors (malformed payload, unknown event type from an older schema): Nack with requeue: false. The message goes to a dead-letter queue if configured, or is discarded.
  • Never silently drop messages. Log every Nack with the MessageId and the reason.

Acknowledgement Strategy

  • Use manual acknowledgement (autoAck: false).
  • Ack only after your side effects are committed.
  • If your processing crashes before Ack, the message is automatically redelivered -- this is the intended behavior.
  • Avoid long-running processing in the message handler. If you need to do heavy work, persist the event and process it asynchronously.

Graceful Shutdown

  • Catch SIGINT and SIGTERM.
  • Cancel the AMQP consumer so the delivery channel closes.
  • Wait for in-flight message processing to complete before closing the connection.
  • Unacknowledged messages will be redelivered to another consumer (or the same one on restart).

Schema Evolution

  • The schema_version header indicates the payload schema version (currently 1).
  • When consuming events, check the version and handle unknown versions gracefully (log and skip, or Nack without requeue).
  • New fields may be added to payloads in a backward-compatible way. Use lenient JSON deserialization and do not fail on unknown fields.

Ordering

  • RabbitMQ does not guarantee strict global ordering across multiple publishers or queues.
  • Events for the same aggregate are published sequentially by the outbox relay, but network conditions may cause reordering.
  • Do not rely on event ordering for correctness. Use the occurred_at header and aggregate version checks if ordering matters for your use case.