Etrans C4

Email Transactional - System Overview

Architecture documentation following the C4 model — zoom from high-level context down to component details.


C4 Level 1: System Context Diagram

Who uses the system and what external systems does it depend on?

graph TB
    classDef person fill:#08427b,stroke:#052e56,color:#fff
    classDef system fill:#1168bd,stroke:#0b4884,color:#fff
    classDef external fill:#999,stroke:#6b6b6b,color:#fff

    Developer["👤 Developer / App<br><i>Sends transactional emails<br>via API or SMTP</i>"]:::person
    Admin["👤 Admin<br><i>Manages domains, API keys,<br>billing subscriptions</i>"]:::person

    ETrans["📧 Email Transactional<br>System<br><i>Accepts email requests,<br>delivers via SMTP,<br>tracks delivery status</i>"]:::system

    BizFlyAuth["BizFly Auth<br><i>JWT token validation<br>& user lookup</i>"]:::external
    BillingAPI["Billing API<br><i>Package pricing,<br>subscription management</i>"]:::external
    RecipientInbox["Recipient Inbox<br><i>Gmail, Outlook, etc.</i>"]:::external
    Sentry["Sentry<br><i>Error tracking</i>"]:::external

    Developer -->|"POST /emails<br>(API Key)"| ETrans
    Admin -->|"Bearer token<br>(domains, billing, keys)"| ETrans
    ETrans -->|"Validate tokens"| BizFlyAuth
    ETrans -->|"Check pricing & balance"| BillingAPI
    ETrans -->|"Deliver emails via SMTP"| RecipientInbox
    ETrans -->|"Report errors"| Sentry

C4 Level 2: Container Diagram

What are the running applications, data stores, and how do they communicate?

graph TB
    classDef person fill:#08427b,stroke:#052e56,color:#fff
    classDef container fill:#438dd5,stroke:#2e6295,color:#fff
    classDef database fill:#438dd5,stroke:#2e6295,color:#fff
    classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
    classDef external fill:#999,stroke:#6b6b6b,color:#fff

    Developer["👤 Developer / App"]:::person
    Admin["👤 Admin"]:::person

    subgraph EmailTransactional ["Email Transactional System"]
        API["API Server<br><i>Python 3.10 / FastAPI</i><br><i>:8000</i>"]:::container
        SendmailWorker["Sendmail Worker<br><i>Go 1.24</i><br><i>Kafka consumer →<br>SMTP delivery</i>"]:::container
        FetchLogWorker["Fetch Log Worker<br><i>Go 1.24</i><br><i>Kafka consumer →<br>MongoDB storage</i>"]:::container

        BillingDB[("Billing DB<br><i>PostgreSQL</i><br><i>READ-ONLY</i><br><i>domains, dkim,<br>mkt_domain, users</i>")]:::database
        UsendDB[("USEND DB<br><i>PostgreSQL</i><br><i>READ/WRITE</i><br><i>emails, api_keys,<br>packs, subscriptions</i>")]:::database
        MongoDB[("MongoDB<br><i>logs_db</i><br><i>Transaction logs</i>")]:::database
        Redis[("Redis<br><i>SMTP credential cache,<br>API key cache</i>")]:::database

        SendmailTopic["Kafka: sendmail topic"]:::queue
        LogTopic["Kafka: email-logs topic"]:::queue
    end

    SMTP["SMTP Server<br><i>Postfix</i>"]:::external
    Filebeat["Filebeat<br><i>Log shipper</i>"]:::external

    Developer -->|"POST /emails<br>(X-API-KEY)"| API
    Admin -->|"Bearer token<br>(manage resources)"| API

    API -->|"read domains,<br>quotas"| BillingDB
    API -->|"read/write emails,<br>keys, packs"| UsendDB
    API -->|"read delivery logs"| MongoDB
    API -->|"cache/read<br>SMTP creds"| Redis
    API -->|"publish email<br>payload (JSON)"| SendmailTopic

    SendmailTopic -->|"consume"| SendmailWorker
    SendmailWorker -->|"fetch & decrypt<br>SMTP creds"| Redis
    SendmailWorker -->|"send email<br>(STARTTLS)"| SMTP

    SMTP -->|"write mail logs"| Filebeat
    Filebeat -->|"publish structured<br>log events"| LogTopic

    LogTopic -->|"consume"| FetchLogWorker
    FetchLogWorker -->|"filter & insert<br>log documents"| MongoDB

C4 Level 3: Component Diagrams

API Server Components

What are the internal modules of the Python API?

graph TB
    classDef component fill:#85bbf0,stroke:#5d82a8,color:#000
    classDef database fill:#438dd5,stroke:#2e6295,color:#fff
    classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
    classDef external fill:#999,stroke:#6b6b6b,color:#fff

    Client["👤 Client"]

    subgraph API ["API Server (Python / FastAPI)"]

        subgraph Routes ["API Layer (src/api/routes/)"]
            EmailRoutes["Email Routes<br><i>/emails, /emails/batch</i>"]:::component
            DomainRoutes["Domain Routes<br><i>/domains, /domains/{id}</i>"]:::component
            BillingRoutes["Billing Routes<br><i>/billing/subscription,<br>/billing/payg</i>"]:::component
            ApiKeyRoutes["API Key Routes<br><i>/api-keys</i>"]:::component
            LogRoutes["Log Routes<br><i>/logs</i>"]:::component
            SenderRoutes["Sender Routes<br><i>/senders</i>"]:::component
            TestRoutes["Testing Routes<br><i>/testing/set-pack</i>"]:::component
        end

        Auth["Auth Middleware<br><i>Bearer token validation,<br>API key lookup,<br>permission check</i>"]:::component
        CommandBus["Command Bus<br><i>Routes commands to<br>use cases</i>"]:::component

        subgraph UseCases ["Application Layer (src/app/use_cases/)"]
            SendMail["SendMailUseCase<br><i>Validate, cache creds,<br>publish to Kafka</i>"]:::component
            CreateDomain["CreateDomainUseCase<br><i>Check limit, generate<br>DKIM, sync to prod</i>"]:::component
            VerifyDomain["VerifyDomainUseCase<br><i>DNS verification</i>"]:::component
            CreateSub["CreateSubscriptionUseCase<br><i>Assign pack to domain</i>"]:::component
            ManageKeys["API Key UseCases<br><i>Create, List, Delete</i>"]:::component
            SetPack["SetPackUseCase<br><i>Testing: assign pack<br>to all domains</i>"]:::component
        end

        subgraph Adapters ["Adapter Layer (src/adapters/)"]
            UoW["Unit of Work<br><i>Transaction management,<br>12 repositories</i>"]:::component
            KafkaPublisher["Kafka Publisher<br><i>Publish email payloads</i>"]:::component
            RedisCache["Redis Client<br><i>Credential & key caching</i>"]:::component
            BillingSync["Billing Sync Service<br><i>Sync to prod billing DB</i>"]:::component
        end
    end

    BillingDB[("Billing DB")]:::database
    UsendDB[("USEND DB")]:::database
    MongoDB[("MongoDB")]:::database
    Redis[("Redis")]:::database
    Kafka["Kafka: sendmail"]:::queue
    BizFlyAuth["BizFly Auth"]:::external

    Client --> Auth
    Auth -->|"validate token"| BizFlyAuth
    Auth --> Routes
    Routes --> CommandBus
    CommandBus --> UseCases

    SendMail --> UoW
    SendMail --> KafkaPublisher
    SendMail --> RedisCache
    CreateDomain --> UoW
    CreateDomain --> BillingSync
    CreateSub --> UoW
    ManageKeys --> UoW
    SetPack --> UoW

    UoW --> BillingDB
    UoW --> UsendDB
    LogRoutes -->|"read logs"| MongoDB
    KafkaPublisher --> Kafka
    RedisCache --> Redis

Sendmail Worker Components

graph TB
    classDef component fill:#85bbf0,stroke:#5d82a8,color:#000
    classDef external fill:#999,stroke:#6b6b6b,color:#fff
    classDef queue fill:#d4a017,stroke:#a07b10,color:#fff

    Kafka["Kafka: sendmail topic"]:::queue

    subgraph Worker ["Sendmail Worker (Go 1.24)"]
        Consumer["Kafka Consumer<br><i>Sarama ConsumerGroup<br>Manual offset commit<br>Back-pressure via Done channel</i>"]:::component
        Pool["Worker Pool<br><i>10 goroutines (configurable)<br>Buffered channel (cap 100)</i>"]:::component
        EmailService["Email Service<br><i>Build MIME message<br>5-min timeout per email<br>5 retries, exponential backoff</i>"]:::component
        RedisClient["Redis Client<br><i>Fetch hash 'auth' by user_id<br>10 retries on connect</i>"]:::component
        Crypto["AES-256-CBC Decryptor<br><i>Decrypt SMTP credentials<br>TOKEN_SECRET + TOKEN_IV</i>"]:::component
    end

    Redis[("Redis")]:::external
    SMTP["SMTP Server"]:::external

    Kafka --> Consumer
    Consumer -->|"Task{EmailPayload, Done}"| Pool
    Pool --> EmailService
    EmailService --> RedisClient
    RedisClient -->|"base64 encrypted creds"| Redis
    RedisClient --> Crypto
    Crypto -->|"email + password"| EmailService
    EmailService -->|"STARTTLS / go-mail"| SMTP

Fetch Log Worker Components

graph TB
    classDef component fill:#85bbf0,stroke:#5d82a8,color:#000
    classDef external fill:#999,stroke:#6b6b6b,color:#fff
    classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
    classDef database fill:#438dd5,stroke:#2e6295,color:#fff

    Kafka["Kafka: email-logs topic"]:::queue

    subgraph Worker ["Fetch Log Worker (Go 1.24)"]
        Consumer["Kafka Consumer<br><i>Sarama ConsumerGroup<br>Offset: earliest<br>Back-pressure via Done channel</i>"]:::component
        Pool["Worker Pool<br><i>10 goroutines (configurable)<br>Buffered channel (cap 100)</i>"]:::component
        EventFilter["Event Filter<br><i>Accept: Sender, Queue,<br>Deferred, Reject<br>Skip: bizfly.vn pattern</i>"]:::component
        MongoClient["MongoDB Client<br><i>Insert log documents<br>Auto-create indexes</i>"]:::component
        SentryReporter["Sentry Reporter<br><i>Custom HTTP client<br>Tags: topic, partition, offset</i>"]:::component
    end

    MongoDB[("MongoDB<br><i>Indexes:<br>from_domain + timestamp<br>message_id<br>event + timestamp</i>")]:::database
    Sentry["Sentry"]:::external

    Kafka --> Consumer
    Consumer -->|"Task{raw message}"| Pool
    Pool --> EventFilter
    EventFilter -->|"accepted events"| MongoClient
    EventFilter -->|"dropped"| Drop["Silent drop"]
    MongoClient --> MongoDB
    Pool -->|"errors"| SentryReporter
    SentryReporter --> Sentry

Data Flow: Send Email (Sequence)

sequenceDiagram
    participant C as Client
    participant API as API Server
    participant PG as PostgreSQL
    participant R as Redis
    participant K1 as Kafka (sendmail)
    participant SW as Sendmail Worker
    participant SMTP as SMTP Server
    participant FB as Filebeat
    participant K2 as Kafka (email-logs)
    participant FLW as Fetch Log Worker
    participant M as MongoDB

    C->>API: POST /emails
    API->>PG: Check quotas (mkt_domain)
    API->>R: Cache SMTP credentials (AES encrypted)
    API->>PG: Save EmailMessage (status: queued)
    API->>K1: Publish email payload
    API-->>C: 200 OK (message_id)

    K1->>SW: Consume message
    SW->>R: Fetch SMTP creds (hash "auth")
    SW->>SW: Decrypt AES-256-CBC
    SW->>SMTP: Send email (retry 5x)

    SMTP->>FB: Delivery logs
    FB->>K2: Publish log event

    K2->>FLW: Consume event
    FLW->>FLW: Filter (Sender/Queue/Deferred/Reject)
    FLW->>M: Insert log document

    C->>API: GET /logs
    API->>M: Query logs
    API-->>C: Mapped status labels

Three Services

Service Language Port Role
API Server Python 3.10 / FastAPI 8000 HTTP endpoints, business logic, publishes to Kafka
Sendmail Worker Go 1.24 - Consumes email requests from Kafka, delivers via SMTP
Fetch Log Worker Go 1.24 - Consumes SMTP delivery logs from Kafka, stores in MongoDB

End-to-End Flow: Sending an Email

Step Component Actions
1 Client Sends POST /emails with email payload
2 API Validates auth (Bearer/API key)
Checks quotas (daily/hourly from mkt_domain in Billing DB)
Caches SMTP credentials to Redis (AES-256-CBC encrypted)
Saves EmailMessage record to USEND DB (status: queued)
Publishes email payload to Kafka sendmail topic
3 Sendmail Worker Consumes message from Kafka
Fetches SMTP credentials from Redis (hash auth, key=user_id)
Decrypts credentials with AES-256-CBC
Sends email via SMTP (go-mail library, 5 retries, exponential backoff)
4 SMTP Server Delivers the email, writes delivery logs to disk
5 Filebeat Parses SMTP logs, publishes structured events to Kafka email-logs topic
6 Fetch Log Worker Consumes log event from Kafka
Filters by event type (Sender/Queue/Deferred/Reject)
Skips internal messages (bizfly.vn pattern)
Inserts log document into MongoDB
7 Client Queries GET /logs to see delivery status
API Reads from MongoDB, maps raw statuses to display labels

Shared Infrastructure

Infrastructure Who Writes Who Reads Shared Config
Kafka (sendmail topic) API (Python) Sendmail Worker (Go) KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC
Kafka (email-logs topic) Filebeat (external) Fetch Log Worker (Go) KAFKA_BOOTSTRAP_SERVERS, LOG_TOPIC
Redis API (caches SMTP creds) Sendmail Worker (decrypts creds) REDIS_URI, TOKEN_SECRET, TOKEN_IV
MongoDB Fetch Log Worker API (reads logs) MONGODB_URI, MONGODB_DBNAME
Billing DB (PostgreSQL) External / Billing Sync API (read-only queries) DB_URI
USEND DB (PostgreSQL) API API USEND_DB_URI

API Service (Python)

Tech Stack

  • Python 3.10, FastAPI, SQLAlchemy 2.0, Pydantic
  • Clean Architecture: Domain -> App -> Adapters -> API
  • Command Bus pattern for all use cases

Dual Database Architecture

Database Config Access Tables
Billing DB DB_URI READ-ONLY domain, dkim, mkt_domain, users, temporary_password
USEND DB USEND_DB_URI Read/Write api_keys, email_messages, email_transaction_pack, domain_subscription_status, user_pack, domain_tracking_settings, transport

CRITICAL: Never run migrations or modify schema on the Billing DB. It is managed externally.

Authentication

Method Header Permission
Bearer Token Authorization: Bearer <JWT> Always full_access
API Key X-API-KEY: <key> Depends on key's configured level
Cookie Session cookie Passthrough

Permission Hierarchy: read_only < sending_access < full_access (higher includes lower)

API Endpoints Quick Reference

Group Method Path Auth Description
Emails POST /emails sending_access Send email
POST /emails/batch sending_access Send batch emails
GET /emails read_only List emails (paginated)
GET /emails/{message_id} read_only Get email details
Logs GET /logs read_only Get transaction logs
Domains POST /domains full_access Create domain
GET /domains read_only List domains
GET /domains/{domain_id} read_only Get domain details
GET /domains/{domain_id}/quota read_only Get domain quota
POST /domains/{domain_id}/verify full_access Verify domain DNS
PATCH /domains/{domain_id} full_access Update tracking
DELETE /domains/{domain_id} full_access Delete domain
API Keys POST /api-keys full_access Create API key
GET /api-keys read_only List API keys
GET /api-keys/{key_id} read_only Get API key
DELETE /api-keys/{key_id} full_access Delete API key
Billing GET /billing/subscription/packs read_only List packages
GET /billing/subscription/current-pack read_only Get current pack
POST /billing/subscription/subs full_access Create subscription
GET /billing/payg/packages read_only Get PAYG packages
POST /billing/payg/subscriptions full_access Create PAYG sub
GET /billing/payg/subscriptions/{domain_id} read_only Get current PAYG sub
DELETE /billing/payg/subscriptions/{domain_id} full_access Cancel PAYG sub
Senders POST /senders full_access Create SMTP sender
Testing POST /testing/set-pack full_access Set pack for testing
Health GET /healthz none Health check

Detailed API Reference

Emails

POST /emails -- Send Email

Permission: sending_access

Request Body:

Field Type Required Description
from string Yes "email@domain.com" or "Name <email@domain.com>"
to string[] Yes Recipient email addresses
subject string Yes Email subject
html string No* HTML body (*at least one of html or text required)
text string No* Plain text body
cc string[] No CC recipients
bcc string[] No BCC recipients
reply_to string No Reply-to address
attachments object[] No Attachments (see below)
headers object No Custom email headers
tags object[] No Tags (name/value pairs)
template_id string No Template identifier
template_data object No Template rendering data

Attachment object:

Field Type Required
filename string Yes
content string Yes (base64-encoded)
content_type string Yes

Response (200):

{
  "id": "<message-id>",
  "from": "sender@example.com",
  "to": ["recipient@example.com"],
  "status": "queued",
  "created_at": "2026-03-10T04:16:54Z"
}

Errors: 400 (no body, invalid attachment, size > 25MB, quota exceeded, domain not verified) | 401 | 403


POST /emails/batch -- Send Batch Emails

Permission: sending_access

Request Body:

Field Type Required
emails SendEmailRequest[] Yes (same schema as single send)

Response (200):

{
  "data": [{"id": "<message-id>"}],
  "errors": [{"index": 2, "error": "Domain not verified"}]
}

Per-email errors are returned in the errors array, not as HTTP errors.


GET /emails -- List Emails

Permission: read_only

Query Parameters:

Param Type Default Description
page int 1 Page number (min: 1)
per_page int 20 Items per page (1-100)
status string - Filter: queued,sent,delivered,bounced,deferred,delayed (comma-separated)
created_at_from string - ISO-8601 UTC lower bound
created_at_to string - ISO-8601 UTC upper bound

Response (200):

{
  "data": [
    {
      "id": "msg_abc123",
      "from": "sender@example.com",
      "to": ["recipient@example.com"],
      "subject": "Hello",
      "status": "Delivered",
      "created_at": "2026-03-10T04:16:54Z"
    }
  ],
  "total": 42,
  "page": 1,
  "per_page": 20,
  "stats": [
    {"label": "Delivered", "count": 30, "percentage": 71.4},
    {"label": "Bounced", "count": 12, "percentage": 28.6}
  ]
}

Notes: Status values in response are display labels (Delivered, Delayed, Bounced, Sent, Queued). Filter accepts both raw (deferred) and display (delayed) names.

Errors: 400 (invalid status, invalid timestamp, from > to, per_page out of range)


GET /emails/{message_id} -- Get Email

Permission: read_only

Response (200):

{
  "id": "msg_abc123",
  "from": "sender@example.com",
  "to": ["recipient@example.com"],
  "subject": "Hello",
  "html": "<html>...</html>",
  "status": "Delivered",
  "created_at": "2026-03-10T04:16:54Z",
  "delivered_at": "2026-03-10T04:16:58Z"
}

Errors: 404 (not found)


Logs

GET /logs -- Get Transaction Logs

Permission: read_only

Query Parameters:

Param Type Default Description
page int 1 Page number
per_page int 20 Items per page (1-100)
log_before string - Filter before timestamp (ISO-8601)
log_after string - Filter after timestamp (ISO-8601)
action string - Filter by raw action (delivered, deferred, rejected)
sender string - Filter by sender email
recipient string - Filter by recipient email
message_id string - Filter by message ID
source string - Filter: api, smtp, or comma-separated
sorts string -timestamp Sort field (- prefix = descending)

Response (200):

{
  "logs": [
    {
      "event": "Sender",
      "timestamp": "2026-03-10T04:16:54Z",
      "status": "Delivered",
      "action": "250 2.0.0 OK",
      "action_detail": "250 2.0.0 OK 1772679259 xyz",
      "sender": "hello@example.com",
      "recipient": "user@example.com",
      "from_domain": "example.com",
      "to_domain": "example.com",
      "message_id": "msg_abc123",
      "source": "API"
    }
  ],
  "total": 150,
  "page": 1,
  "per_page": 20,
  "stats": [
    {"label": "Delivered", "count": 100, "percentage": 66.7},
    {"label": "Bounced", "count": 30, "percentage": 20.0},
    {"label": "API", "count": 80, "percentage": 53.3},
    {"label": "SMTP", "count": 70, "percentage": 46.7}
  ]
}

Notes: status is the mapped display label. action is the short SMTP code. action_detail is the full SMTP response. source is "API" or "SMTP" (computed by cross-referencing message_id with USEND DB).

Errors: 400 (invalid source, per_page out of range)


Domains

POST /domains -- Create Domain

Permission: full_access

Request Body:

Field Type Required
domain string Yes

Response (200):

{
  "id": "dom_123",
  "domain": "example.com",
  "status": "pending",
  "dns_records": [
    {"type": "TXT", "name": "@", "value": "v=spf1 include:...", "purpose": "spf"},
    {"type": "MX", "name": "@", "value": "capmx-re30...", "priority": 5, "purpose": "mx"},
    {"type": "TXT", "name": "dkim._domainkey", "value": "v=DKIM1;...", "purpose": "dkim"}
  ],
  "created_at": "2026-03-10T04:16:54Z"
}

Errors: 400 (duplicate domain, domain limit reached)


GET /domains -- List Domains

Permission: read_only

Query Parameters:

Param Type Default Description
page int 1 Page number
per_page int 20 Items per page (1-100)
search string "" Search domain names

Response (200):

{
  "data": [
    {"id": "dom_123", "domain": "example.com", "status": "verified", "created_at": "..."}
  ],
  "total": 3,
  "total_verified": 2,
  "page": 1,
  "per_page": 20
}

GET /domains/{domain_id} -- Get Domain

Permission: read_only | Path: domain_id in dom_123 format

Response (200):

{
  "id": "dom_123",
  "domain": "example.com",
  "status": "verified",
  "dns_records": [...],
  "verified_at": "2026-03-10T04:16:54Z"
}

Errors: 400 (invalid format) | 404 (not found)


GET /domains/{domain_id}/quota -- Get Domain Quota

Permission: read_only

Response (200):

{
  "domain": "example.com",
  "daily_limit": 500,
  "daily_used": 120,
  "daily_remaining": 380,
  "hourly_limit": 50,
  "hourly_used": 10,
  "hourly_remaining": 40,
  "monthly_limit": 15500,
  "monthly_used": 3600,
  "monthly_remaining": 11900,
  "pack_code": "DEDICATED_100K",
  "pack_name": "Dedicated 100K"
}

POST /domains/{domain_id}/verify -- Verify Domain

Permission: full_access | Request Body: None

Response (200):

{
  "id": "dom_123",
  "domain": "example.com",
  "status": "verified",
  "dns_records": [
    {"type": "TXT", "name": "@", "value": "...", "purpose": "spf", "verified": true},
    {"type": "MX", "name": "@", "value": "...", "purpose": "mx", "verified": true},
    {"type": "TXT", "name": "dkim._domainkey", "value": "...", "purpose": "dkim", "verified": false}
  ],
  "verified_at": "2026-03-10T04:16:54Z"
}

Errors: 400 (DNS verification failed) | 404


PATCH /domains/{domain_id} -- Update Domain Tracking

Permission: full_access

Request Body:

Field Type Required
click_tracking bool No
open_tracking bool No

Response (200):

{
  "id": "dom_123",
  "domain": "example.com",
  "click_tracking": true,
  "open_tracking": false
}

DELETE /domains/{domain_id} -- Delete Domain

Permission: full_access | Request Body: None

Response (200):

{"id": "dom_123", "deleted": true}

API Keys

POST /api-keys -- Create API Key

Permission: full_access

Request Body:

Field Type Required Default Description
name string Yes - Key name
permission string No "full_access" full_access, sending_access, or read_only
expires_in_days int No null (never) 7, 30, 60, 90, 180, or 365

Response (200):

{
  "id": "key_abc123",
  "name": "My API Key",
  "key": "etrans_live_abc123def456...",
  "permission": "full_access",
  "expires_at": "2026-06-10T04:16:54Z",
  "created_at": "2026-03-10T04:16:54Z"
}

Important: The key field is shown only once at creation. It cannot be retrieved later.


GET /api-keys -- List API Keys

Permission: read_only

Query Parameters: page (default: 1), per_page (default: 20, max: 100)

Response (200):

{
  "data": [
    {
      "id": "key_abc123",
      "name": "My API Key",
      "permission": "full_access",
      "expires_at": "2026-06-10T04:16:54Z",
      "is_expired": false,
      "created_at": "2026-03-10T04:16:54Z"
    }
  ],
  "total": 5,
  "page": 1,
  "per_page": 20
}

GET /api-keys/{key_id} -- Get API Key

Permission: read_only

Response (200):

{
  "id": "key_abc123",
  "name": "My API Key",
  "permission": "full_access",
  "expires_at": null,
  "is_expired": false,
  "active": true,
  "created_at": "2026-03-10T04:16:54Z",
  "updated_at": "2026-03-10T04:16:54Z"
}

DELETE /api-keys/{key_id} -- Delete API Key

Permission: full_access | Request Body: None

Response (200):

{"id": "key_abc123", "deleted": true}

Billing - Subscription

GET /billing/subscription/packs -- List Packages

Permission: read_only

Response (200):

{
  "success": true,
  "packages": [
    {
      "pack_code": "DEDICATED_100K",
      "pack_type": "dedicated_ip",
      "daily_email_limit": 100000,
      "price": 500000.0,
      "is_active": true
    }
  ]
}

GET /billing/subscription/current-pack -- Get Current Pack

Permission: read_only

Response (200):

{
  "pack_code": "DEDICATED_100K",
  "pack_name": "Dedicated 100K",
  "pack_type": "dedicated_ip",
  "daily_email_limit": 100000,
  "domain_limit": 5,
  "quota": {
    "daily_limit": 100000,
    "daily_used": 5000,
    "daily_remaining": 95000,
    "hourly_limit": 4166,
    "hourly_used": 200,
    "hourly_remaining": 3966,
    "monthly_limit": 3100000,
    "monthly_used": 150000,
    "monthly_remaining": 2950000
  },
  "domains": [
    {"domain_id": "dom_123", "domain_name": "example.com", "status": "verified"}
  ]
}

POST /billing/subscription/subs -- Create Subscription

Permission: full_access

Request Body:

Field Type Required
pack_code string Yes
domain_id int Yes

Response (200):

{
  "success": true,
  "message": "Subscription created successfully",
  "domain_id": 123,
  "pack_code": "DEDICATED_100K"
}

Errors: 400 | 403 (not Owner, insufficient balance) | 404


Billing - Pay As You Go

GET /billing/payg/packages -- Get PAYG Package

Permission: read_only

Response (200):

{
  "pack_code": "PAYG_SHARED",
  "pack_name": "Pay As You Go",
  "pack_type": "shared_ip",
  "pricing": {}
}

POST /billing/payg/subscriptions -- Create PAYG Subscription

Permission: full_access

Request Body:

Field Type Required
domain_id int Yes

Response (200):

{
  "success": true,
  "message": "PAYG subscription created",
  "subscription": {},
  "previous_pack": "DEDICATED_100K"
}

GET /billing/payg/subscriptions/{domain_id} -- Get Current PAYG Subscription

Permission: read_only

Response (200):

{"subscription": {}}

Errors: 404 (no PAYG subscription)


DELETE /billing/payg/subscriptions/{domain_id} -- Cancel PAYG Subscription

Permission: full_access

Response (200):

{"success": true, "message": "PAYG subscription cancelled"}

Senders

POST /senders -- Create Sender

Permission: full_access

Request Body:

Field Type Required Constraints
account string Yes Email address (e.g., noreply@example.com)
domain string Yes Domain name
password string Yes min 8 characters

Response (200):

{
  "id": "sender_123",
  "account": "noreply@example.com",
  "domain": "example.com",
  "active": true,
  "daily_limit": 100,
  "hourly_limit": 4,
  "created_at": "2026-03-10T04:16:54Z"
}

Errors: 400 (sender already exists)


Testing

POST /testing/set-pack -- Set Pack (Testing Only)

Permission: full_access

Request Body:

Field Type Required
pack_code string Yes

Response (200):

{
  "pack_code": "DEDICATED_100K",
  "pack_name": "Dedicated 100K",
  "domain_limit": 5,
  "daily_email_limit": 100000,
  "domains_updated": 3
}

Sets the pack for ALL user domains. Skips billing API and Kafka events. Works even with 0 domains (stores user-level pack assignment).

Errors: 400 (invalid pack_code)


Health Check

GET /healthz

Auth: None

Response (200):

{"status": "ok", "service": "email-transactional"}

Command Bus (19 Use Cases)

Domain Commands External Services
Email Sending Send, Get, List Kafka, Redis, MongoDB
Domains Create, Verify, Get, List, Delete, UpdateTracking, GetQuota BillingSync
Billing GetPacks, GetCurrentPack, CreateSub, PAYG (4 cmds), SetPack External Billing API
API Keys Create, List, Get, Delete Redis (cache eviction)
Senders Create Redis, Auth, BillingSync

Business Rules

Rule Where How
Domain limit CreateDomainUseCase Checks pack.domain_limit via domain subscriptions or user_pack fallback
Email quota SendMailUseCase Checks mkt_domain.daily/hourly limits
Rate limit tracking mkt_domain table daily_tracking/hourly_tracking counters
API key auth auth.py SHA-256 hashed keys, plaintext verified on request
SMTP credential cache send.py -> Redis AES-256-CBC encrypted, stored in Redis hash auth

API Environment Variables

Category Variable Purpose
Databases DB_URI Billing PostgreSQL (read-only)
USEND_DB_URI Main app PostgreSQL
PROD_BILLING_DB_URI Prod billing DB (for staging sync)
Kafka KAFKA_BOOTSTRAP_SERVERS Kafka brokers
KAFKA_TOPIC Sendmail topic (default: sendmail)
KAFKA_BILLING_TOPIC Billing events topic
KAFKA_SASL_* SASL authentication
Redis REDIS_URI Redis connection
MongoDB MONGODB_URI / MONGODB_DBNAME / MONGODB_COLLECTION Log storage
Auth MANAGE_AUTH_URI BizFly auth API
KEYSTONE_* Keystone admin auth
Encryption TOKEN_SECRET / TOKEN_IV AES-256-CBC for SMTP credentials
Billing BILLING_V4_URI External billing API
DNS DEFAULT_SPF_VALUE, DEFAULT_MX_* DNS record templates
Monitoring SENTRY_DSN Error tracking
APM_* Elastic APM (currently disabled)

Sendmail Worker (Go)

Purpose

Consumes email send requests from Kafka and delivers them via SMTP.

How It Works

See C4 Level 3: Sendmail Worker Components for the full component diagram.

Processing pipeline: Kafka Consumer → Worker Pool (10 goroutines) → Email Service → Redis (fetch creds) → AES Decrypt → SMTP Send

Key behaviors:

  • Manual Kafka offset commit with back-pressure (blocks until task.Done)
  • 5-minute timeout per email
  • 5 retries with exponential backoff (1s, 2s, 4s, 8s, 16s)
  • TLS opportunistic mode

Kafka Message Format (Input)

{
  "payload": {
    "sender": { "name": "Sender Name", "email": "sender@example.com" },
    "recipients": ["to@example.com"],
    "cc": ["cc@example.com"],
    "bcc": ["bcc@example.com"],
    "subject": "Subject line",
    "body": "<html>Email body</html>",
    "message_id": "<unique-message-id>",
    "attachments": [
      {
        "filename": "file.pdf",
        "content": "<base64-encoded>",
        "content_type": "application/pdf"
      }
    ],
    "user_id": "12345"
  }
}

Environment Variables

Variable Default Purpose
KAFKA_BOOTSTRAP_SERVERS required Kafka brokers
KAFKA_TOPIC required Topic to consume (sendmail)
KAFKA_GROUP_ID required Consumer group ID
KAFKA_SASL_* - SASL authentication
REDIS_HOST / REDIS_PORT / REDIS_DB required Redis for SMTP creds
SMTP_HOST / SMTP_PORT required SMTP server
SMTP_SECURE false TLS mode
TOKEN_SECRET required AES-256 key (32 bytes)
TOKEN_IV required AES-256 IV (16 bytes)
WORKER_POOL_SIZE 10 Concurrent goroutines

Key Files

File Purpose
sendmail_worker/main.go Entry point, wiring
sendmail_worker/config/config.go Env var loading + validation
sendmail_worker/config/crypto.go AES-256-CBC decryption
sendmail_worker/kafka/consumer.go Sarama consumer with back-pressure
sendmail_worker/kafka/message.go Message structs
sendmail_worker/worker/processor.go Goroutine pool
sendmail_worker/email/service.go SMTP sending + retry logic
sendmail_worker/redis/client.go Credential fetch from Redis

Fetch Log Worker (Go)

Purpose

Consumes SMTP delivery log events from Kafka (published by Filebeat) and stores them in MongoDB.

How It Works

See C4 Level 3: Fetch Log Worker Components for the full component diagram.

Processing pipeline: Filebeat → Kafka → Consumer → Worker Pool (10 goroutines) → Event Filter → MongoDB Insert

Key behaviors:

  • Manual Kafka offset commit with back-pressure (blocks until task.Done)
  • Default offset: earliest (processes historical events)
  • Only stores: Sender, Queue, Deferred, Reject events
  • Skips messages matching bizfly.vn pattern (internal emails)
  • Auto-creates MongoDB indexes on startup
  • Errors reported to Sentry

Log Event Format (Input from Kafka)

{
  "Event": "Sender",
  "@timestamp": "2026-02-01T10:30:15.000Z",
  "SMTP_Action": "delivered",
  "from": "hello@yourdomain.com",
  "from_name": "Hello",
  "recipient": "user@example.com",
  "to_name": "User",
  "response": "250 2.0.0 OK",
  "from_domain": "yourdomain.com",
  "Event_ID": "evt_123",
  "message_id": "msg_1234567890",
  "to_domain": "example.com",
  "agent": {
    "id": "agent_1",
    "type": "filebeat",
    "hostname": "host1",
    "name": "agent-name",
    "ephemeral_id": "eph_1"
  }
}

Event Types

Event Type Meaning Stored?
Sender Email accepted by SMTP server Yes
Queue Email queued for delivery Yes
Deferred Delivery temporarily failed, will retry Yes
Reject Email rejected Yes
Others Various SMTP events No (filtered out)

MongoDB Document (Output)

MongoDB Field Source
event Event
timestamp @timestamp
action SMTP_Action
sender from
recipient recipient
response response (full SMTP response)
from_domain from_domain
message_id message_id
to_domain to_domain
event_id Event_ID
agent_* Agent metadata (type, name, hostname)

MongoDB Indexes

Index Name Fields Purpose
idx_from_domain_timestamp from_domain ASC, timestamp DESC Log queries filtered by domain
idx_message_id message_id Single email log lookup
idx_event_timestamp event ASC, timestamp DESC Event type filtering

Environment Variables

Variable Default Purpose
KAFKA_BOOTSTRAP_SERVERS required Kafka brokers
LOG_TOPIC required Topic to consume (email-logs)
LOG_GROUP_ID required Consumer group ID
KAFKA_SASL_* - SASL authentication
MONGODB_URI required MongoDB connection
MONGODB_DBNAME required Database name
MONGODB_COLLECTION required Collection name
LOG_WORKER_POOL_SIZE 10 Concurrent goroutines
LOG_PROCESS_TIMEOUT_SECONDS 10 Processing timeout
LOG_MONGO_INSERT_TIMEOUT_SECONDS 10 Insert timeout
STORE_EVENT_TYPES Sender,Queue,Deferred,Reject Which events to store
SKIP_MESSAGE_ID_PATTERN bizfly.vn Skip internal messages
SENTRY_DSN - Error tracking

Key Files

File Purpose
main.go Entry point, wiring
config/config.go Env var loading + filter config
kafka/consumer.go Sarama consumer with back-pressure
worker/pool.go Goroutine pool + event filtering
events/log_sender.go LogEvent struct + MongoDB document conversion
mongo/client.go MongoDB insert + index management
observability/sentry.go Custom Sentry error reporting

Deployment

Docker Services

Service Dockerfile Exposed Port
email_transaction_api ./Dockerfile 8000
sendmail_worker fetch_log_worker_go/sendmail_worker/Dockerfile -
fetch_log_worker fetch_log_worker_go/Dockerfile -

Git Repositories

Repo Path Remote
API (root) /email-transactional/ git@git.paas.vn:devteam/vce/email-transactional.git
Combined Workers /email-transactional/fetch_log_worker_go/ git@git.paas.vn:devteam/vce/etrans-fetch-log-worker.git

The standalone sendmail_worker/ repo is deprecated. Source of truth is fetch_log_worker_go/sendmail_worker/.


Status Mapping (API Display Labels)

The API maps raw SMTP statuses to user-friendly labels:

Raw Status (DB/Logs) Display Label Source
queued Queued API sets on send
sender / sent Sent SMTP accepted
delivered / delivery Delivered SMTP delivered
deferred Delayed SMTP temporary failure
bounced / reject / rejected Bounced SMTP permanent failure

Filters on GET /emails accept both raw and display names (delayed -> deferred).


State Machine Diagrams

Email Lifecycle

stateDiagram-v2
    [*] --> Queued: POST /emails

    Queued --> Sent: Sendmail Worker picks up<br>from Kafka & SMTP accepts
    Queued --> Bounced: Sendmail Worker fails<br>after 5 retries

    Sent --> Delivered: SMTP server confirms<br>delivery to recipient
    Sent --> Deferred: SMTP temporary failure<br>(will retry)
    Sent --> Bounced: SMTP permanent rejection<br>(invalid address, etc.)

    Deferred --> Delivered: SMTP retry succeeds
    Deferred --> Bounced: SMTP gives up<br>after retries
    Deferred --> Deferred: SMTP retries again

    Delivered --> [*]
    Bounced --> [*]

    note right of Queued
        API saves to USEND DB
        Publishes to Kafka
    end note

    note right of Sent
        Sendmail Worker sends via SMTP
        Filebeat captures logs
    end note

    note right of Deferred
        Display label: "Delayed"
    end note

Domain Lifecycle

stateDiagram-v2
    [*] --> Pending: POST /domains

    Pending --> Verified: POST /domains/{id}/verify<br>(all DNS records pass)
    Pending --> Pending: POST /domains/{id}/verify<br>(DNS check fails)

    Verified --> Deleted: DELETE /domains/{id}
    Pending --> Deleted: DELETE /domains/{id}

    Deleted --> [*]

    state Verified {
        [*] --> Active
        Active --> TrackingUpdated: PATCH /domains/{id}<br>(click/open tracking)
        TrackingUpdated --> Active
    }

    note right of Pending
        DNS records generated:
        SPF, MX (x3), DKIM
        User must configure DNS
    end note

    note right of Verified
        Domain can send emails
        MKTDomain created with
        daily/hourly limits
    end note

API Key Lifecycle

stateDiagram-v2
    [*] --> Active: POST /api-keys<br>(key shown once)

    Active --> Revoked: DELETE /api-keys/{id}
    Active --> Expired: expires_at reached

    Revoked --> [*]
    Expired --> [*]

    state Active {
        [*] --> FullAccess
        [*] --> SendingAccess
        [*] --> ReadOnly
    }

    note right of Active
        Permission set at creation:
        full_access > sending_access > read_only
        Expiry: 7/30/60/90/180/365 days or never
    end note

Subscription Lifecycle

stateDiagram-v2
    [*] --> NoSubscription

    NoSubscription --> Dedicated: POST /billing/subscription/subs<br>(pack_code + domain_id)
    NoSubscription --> PAYG: POST /billing/payg/subscriptions<br>(domain_id)

    Dedicated --> Dedicated: POST /billing/subscription/subs<br>(resize to different pack)
    Dedicated --> PAYG: POST /billing/payg/subscriptions<br>(switch to PAYG)
    Dedicated --> NoSubscription: Cancel

    PAYG --> Dedicated: POST /billing/subscription/subs<br>(switch to dedicated)
    PAYG --> NoSubscription: DELETE /billing/payg/subscriptions/{id}

    note right of Dedicated
        domain_subscription_status.pack_id
        points to email_transaction_pack
        MKTDomain: daily/hourly limits set
    end note

    note right of PAYG
        pack_code = "PAYG_SHARED"
        MKTDomain: daily=-1, hourly=-1
        (unlimited)
    end note

Sender Lifecycle

stateDiagram-v2
    [*] --> Created: POST /senders

    state Created {
        [*] --> Active
        Active: SMTP credentials cached in Redis
        Active: MKTDomain created (daily=100, hourly=4)
        Active: User created in Billing DB
    }

    note right of Created
        Creates:
        1. User record (Billing DB)
        2. MKTDomain (rate limits)
        3. Temp password
        4. SMTP creds in Redis (encrypted)
    end note

Email Sending Flow (Detailed State Machine)

stateDiagram-v2
    [*] --> ValidateAuth: POST /emails

    ValidateAuth --> CheckQuota: Auth OK
    ValidateAuth --> Rejected: 401/403

    CheckQuota --> CacheCredentials: Quota available
    CheckQuota --> Rejected: 400 Quota exceeded

    CacheCredentials --> SaveToDB: SMTP creds cached in Redis
    SaveToDB --> PublishKafka: EmailMessage saved (queued)
    PublishKafka --> ResponseSent: Published to Kafka topic
    ResponseSent --> [*]: 200 OK returned to client

    state "Sendmail Worker (Async)" as Worker {
        [*] --> ConsumeMessage
        ConsumeMessage --> FetchCreds: Read from Redis
        FetchCreds --> DecryptCreds: AES-256-CBC decrypt
        DecryptCreds --> ConnectSMTP
        ConnectSMTP --> SendEmail
        SendEmail --> Success: 250 OK
        SendEmail --> RetryBackoff: Temporary failure
        RetryBackoff --> ConnectSMTP: Attempt 2-5
        RetryBackoff --> Failed: All 5 attempts failed
    }

    Rejected --> [*]

    note right of Worker
        5-minute timeout per email
        Exponential backoff: 1s, 2s, 4s, 8s, 16s
    end note

Domain Creation with Limit Check

stateDiagram-v2
    [*] --> CheckPackLimit: POST /domains

    CheckPackLimit --> CheckDomainSub: Has domains with subscription?
    CheckPackLimit --> CheckUserPack: No domain subscriptions

    CheckDomainSub --> GetDomainLimit: Found pack via domain_subscription_status
    CheckUserPack --> GetDomainLimit: Found pack via user_pack table
    CheckUserPack --> Unlimited: No user_pack found

    GetDomainLimit --> Unlimited: domain_limit = 0
    GetDomainLimit --> CountDomains: domain_limit > 0

    CountDomains --> CreateAllowed: count < limit
    CountDomains --> LimitReached: count >= limit

    Unlimited --> CheckDuplicate
    CreateAllowed --> CheckDuplicate

    CheckDuplicate --> GenerateDKIM: Domain is unique
    CheckDuplicate --> Rejected: 400 Domain exists

    GenerateDKIM --> SaveDomain: DKIM keys generated
    SaveDomain --> SyncProd: Saved to Billing DB
    SyncProd --> ReturnDNS: Synced to production
    ReturnDNS --> [*]: 200 OK (DNS records)

    LimitReached --> [*]: 400 Domain limit reached
    Rejected --> [*]