Email Transactional - System Overview
Architecture documentation following the C4 model — zoom from high-level context down to component details.
C4 Level 1: System Context Diagram
Who uses the system and what external systems does it depend on?
graph TB
classDef person fill:#08427b,stroke:#052e56,color:#fff
classDef system fill:#1168bd,stroke:#0b4884,color:#fff
classDef external fill:#999,stroke:#6b6b6b,color:#fff
Developer["👤 Developer / App<br><i>Sends transactional emails<br>via API or SMTP</i>"]:::person
Admin["👤 Admin<br><i>Manages domains, API keys,<br>billing subscriptions</i>"]:::person
ETrans["📧 Email Transactional<br>System<br><i>Accepts email requests,<br>delivers via SMTP,<br>tracks delivery status</i>"]:::system
BizFlyAuth["BizFly Auth<br><i>JWT token validation<br>& user lookup</i>"]:::external
BillingAPI["Billing API<br><i>Package pricing,<br>subscription management</i>"]:::external
RecipientInbox["Recipient Inbox<br><i>Gmail, Outlook, etc.</i>"]:::external
Sentry["Sentry<br><i>Error tracking</i>"]:::external
Developer -->|"POST /emails<br>(API Key)"| ETrans
Admin -->|"Bearer token<br>(domains, billing, keys)"| ETrans
ETrans -->|"Validate tokens"| BizFlyAuth
ETrans -->|"Check pricing & balance"| BillingAPI
ETrans -->|"Deliver emails via SMTP"| RecipientInbox
ETrans -->|"Report errors"| Sentry
C4 Level 2: Container Diagram
What are the running applications, data stores, and how do they communicate?
graph TB
classDef person fill:#08427b,stroke:#052e56,color:#fff
classDef container fill:#438dd5,stroke:#2e6295,color:#fff
classDef database fill:#438dd5,stroke:#2e6295,color:#fff
classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
classDef external fill:#999,stroke:#6b6b6b,color:#fff
Developer["👤 Developer / App"]:::person
Admin["👤 Admin"]:::person
subgraph EmailTransactional ["Email Transactional System"]
API["API Server<br><i>Python 3.10 / FastAPI</i><br><i>:8000</i>"]:::container
SendmailWorker["Sendmail Worker<br><i>Go 1.24</i><br><i>Kafka consumer →<br>SMTP delivery</i>"]:::container
FetchLogWorker["Fetch Log Worker<br><i>Go 1.24</i><br><i>Kafka consumer →<br>MongoDB storage</i>"]:::container
BillingDB[("Billing DB<br><i>PostgreSQL</i><br><i>READ-ONLY</i><br><i>domains, dkim,<br>mkt_domain, users</i>")]:::database
UsendDB[("USEND DB<br><i>PostgreSQL</i><br><i>READ/WRITE</i><br><i>emails, api_keys,<br>packs, subscriptions</i>")]:::database
MongoDB[("MongoDB<br><i>logs_db</i><br><i>Transaction logs</i>")]:::database
Redis[("Redis<br><i>SMTP credential cache,<br>API key cache</i>")]:::database
SendmailTopic["Kafka: sendmail topic"]:::queue
LogTopic["Kafka: email-logs topic"]:::queue
end
SMTP["SMTP Server<br><i>Postfix</i>"]:::external
Filebeat["Filebeat<br><i>Log shipper</i>"]:::external
Developer -->|"POST /emails<br>(X-API-KEY)"| API
Admin -->|"Bearer token<br>(manage resources)"| API
API -->|"read domains,<br>quotas"| BillingDB
API -->|"read/write emails,<br>keys, packs"| UsendDB
API -->|"read delivery logs"| MongoDB
API -->|"cache/read<br>SMTP creds"| Redis
API -->|"publish email<br>payload (JSON)"| SendmailTopic
SendmailTopic -->|"consume"| SendmailWorker
SendmailWorker -->|"fetch & decrypt<br>SMTP creds"| Redis
SendmailWorker -->|"send email<br>(STARTTLS)"| SMTP
SMTP -->|"write mail logs"| Filebeat
Filebeat -->|"publish structured<br>log events"| LogTopic
LogTopic -->|"consume"| FetchLogWorker
FetchLogWorker -->|"filter & insert<br>log documents"| MongoDB
C4 Level 3: Component Diagrams
API Server Components
What are the internal modules of the Python API?
graph TB
classDef component fill:#85bbf0,stroke:#5d82a8,color:#000
classDef database fill:#438dd5,stroke:#2e6295,color:#fff
classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
classDef external fill:#999,stroke:#6b6b6b,color:#fff
Client["👤 Client"]
subgraph API ["API Server (Python / FastAPI)"]
subgraph Routes ["API Layer (src/api/routes/)"]
EmailRoutes["Email Routes<br><i>/emails, /emails/batch</i>"]:::component
DomainRoutes["Domain Routes<br><i>/domains, /domains/{id}</i>"]:::component
BillingRoutes["Billing Routes<br><i>/billing/subscription,<br>/billing/payg</i>"]:::component
ApiKeyRoutes["API Key Routes<br><i>/api-keys</i>"]:::component
LogRoutes["Log Routes<br><i>/logs</i>"]:::component
SenderRoutes["Sender Routes<br><i>/senders</i>"]:::component
TestRoutes["Testing Routes<br><i>/testing/set-pack</i>"]:::component
end
Auth["Auth Middleware<br><i>Bearer token validation,<br>API key lookup,<br>permission check</i>"]:::component
CommandBus["Command Bus<br><i>Routes commands to<br>use cases</i>"]:::component
subgraph UseCases ["Application Layer (src/app/use_cases/)"]
SendMail["SendMailUseCase<br><i>Validate, cache creds,<br>publish to Kafka</i>"]:::component
CreateDomain["CreateDomainUseCase<br><i>Check limit, generate<br>DKIM, sync to prod</i>"]:::component
VerifyDomain["VerifyDomainUseCase<br><i>DNS verification</i>"]:::component
CreateSub["CreateSubscriptionUseCase<br><i>Assign pack to domain</i>"]:::component
ManageKeys["API Key UseCases<br><i>Create, List, Delete</i>"]:::component
SetPack["SetPackUseCase<br><i>Testing: assign pack<br>to all domains</i>"]:::component
end
subgraph Adapters ["Adapter Layer (src/adapters/)"]
UoW["Unit of Work<br><i>Transaction management,<br>12 repositories</i>"]:::component
KafkaPublisher["Kafka Publisher<br><i>Publish email payloads</i>"]:::component
RedisCache["Redis Client<br><i>Credential & key caching</i>"]:::component
BillingSync["Billing Sync Service<br><i>Sync to prod billing DB</i>"]:::component
end
end
BillingDB[("Billing DB")]:::database
UsendDB[("USEND DB")]:::database
MongoDB[("MongoDB")]:::database
Redis[("Redis")]:::database
Kafka["Kafka: sendmail"]:::queue
BizFlyAuth["BizFly Auth"]:::external
Client --> Auth
Auth -->|"validate token"| BizFlyAuth
Auth --> Routes
Routes --> CommandBus
CommandBus --> UseCases
SendMail --> UoW
SendMail --> KafkaPublisher
SendMail --> RedisCache
CreateDomain --> UoW
CreateDomain --> BillingSync
CreateSub --> UoW
ManageKeys --> UoW
SetPack --> UoW
UoW --> BillingDB
UoW --> UsendDB
LogRoutes -->|"read logs"| MongoDB
KafkaPublisher --> Kafka
RedisCache --> Redis
Sendmail Worker Components
graph TB
classDef component fill:#85bbf0,stroke:#5d82a8,color:#000
classDef external fill:#999,stroke:#6b6b6b,color:#fff
classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
Kafka["Kafka: sendmail topic"]:::queue
subgraph Worker ["Sendmail Worker (Go 1.24)"]
Consumer["Kafka Consumer<br><i>Sarama ConsumerGroup<br>Manual offset commit<br>Back-pressure via Done channel</i>"]:::component
Pool["Worker Pool<br><i>10 goroutines (configurable)<br>Buffered channel (cap 100)</i>"]:::component
EmailService["Email Service<br><i>Build MIME message<br>5-min timeout per email<br>5 retries, exponential backoff</i>"]:::component
RedisClient["Redis Client<br><i>Fetch hash 'auth' by user_id<br>10 retries on connect</i>"]:::component
Crypto["AES-256-CBC Decryptor<br><i>Decrypt SMTP credentials<br>TOKEN_SECRET + TOKEN_IV</i>"]:::component
end
Redis[("Redis")]:::external
SMTP["SMTP Server"]:::external
Kafka --> Consumer
Consumer -->|"Task{EmailPayload, Done}"| Pool
Pool --> EmailService
EmailService --> RedisClient
RedisClient -->|"base64 encrypted creds"| Redis
RedisClient --> Crypto
Crypto -->|"email + password"| EmailService
EmailService -->|"STARTTLS / go-mail"| SMTP
Fetch Log Worker Components
graph TB
classDef component fill:#85bbf0,stroke:#5d82a8,color:#000
classDef external fill:#999,stroke:#6b6b6b,color:#fff
classDef queue fill:#d4a017,stroke:#a07b10,color:#fff
classDef database fill:#438dd5,stroke:#2e6295,color:#fff
Kafka["Kafka: email-logs topic"]:::queue
subgraph Worker ["Fetch Log Worker (Go 1.24)"]
Consumer["Kafka Consumer<br><i>Sarama ConsumerGroup<br>Offset: earliest<br>Back-pressure via Done channel</i>"]:::component
Pool["Worker Pool<br><i>10 goroutines (configurable)<br>Buffered channel (cap 100)</i>"]:::component
EventFilter["Event Filter<br><i>Accept: Sender, Queue,<br>Deferred, Reject<br>Skip: bizfly.vn pattern</i>"]:::component
MongoClient["MongoDB Client<br><i>Insert log documents<br>Auto-create indexes</i>"]:::component
SentryReporter["Sentry Reporter<br><i>Custom HTTP client<br>Tags: topic, partition, offset</i>"]:::component
end
MongoDB[("MongoDB<br><i>Indexes:<br>from_domain + timestamp<br>message_id<br>event + timestamp</i>")]:::database
Sentry["Sentry"]:::external
Kafka --> Consumer
Consumer -->|"Task{raw message}"| Pool
Pool --> EventFilter
EventFilter -->|"accepted events"| MongoClient
EventFilter -->|"dropped"| Drop["Silent drop"]
MongoClient --> MongoDB
Pool -->|"errors"| SentryReporter
SentryReporter --> Sentry
Data Flow: Send Email (Sequence)
sequenceDiagram
participant C as Client
participant API as API Server
participant PG as PostgreSQL
participant R as Redis
participant K1 as Kafka (sendmail)
participant SW as Sendmail Worker
participant SMTP as SMTP Server
participant FB as Filebeat
participant K2 as Kafka (email-logs)
participant FLW as Fetch Log Worker
participant M as MongoDB
C->>API: POST /emails
API->>PG: Check quotas (mkt_domain)
API->>R: Cache SMTP credentials (AES encrypted)
API->>PG: Save EmailMessage (status: queued)
API->>K1: Publish email payload
API-->>C: 200 OK (message_id)
K1->>SW: Consume message
SW->>R: Fetch SMTP creds (hash "auth")
SW->>SW: Decrypt AES-256-CBC
SW->>SMTP: Send email (retry 5x)
SMTP->>FB: Delivery logs
FB->>K2: Publish log event
K2->>FLW: Consume event
FLW->>FLW: Filter (Sender/Queue/Deferred/Reject)
FLW->>M: Insert log document
C->>API: GET /logs
API->>M: Query logs
API-->>C: Mapped status labels
Three Services
| Service | Language | Port | Role |
|---|---|---|---|
| API Server | Python 3.10 / FastAPI | 8000 | HTTP endpoints, business logic, publishes to Kafka |
| Sendmail Worker | Go 1.24 | - | Consumes email requests from Kafka, delivers via SMTP |
| Fetch Log Worker | Go 1.24 | - | Consumes SMTP delivery logs from Kafka, stores in MongoDB |
End-to-End Flow: Sending an Email
| Step | Component | Actions |
|---|---|---|
| 1 | Client | Sends POST /emails with email payload |
| 2 | API | Validates auth (Bearer/API key) |
Checks quotas (daily/hourly from mkt_domain in Billing DB) |
||
| Caches SMTP credentials to Redis (AES-256-CBC encrypted) | ||
Saves EmailMessage record to USEND DB (status: queued) |
||
Publishes email payload to Kafka sendmail topic |
||
| 3 | Sendmail Worker | Consumes message from Kafka |
Fetches SMTP credentials from Redis (hash auth, key=user_id) |
||
| Decrypts credentials with AES-256-CBC | ||
| Sends email via SMTP (go-mail library, 5 retries, exponential backoff) | ||
| 4 | SMTP Server | Delivers the email, writes delivery logs to disk |
| 5 | Filebeat | Parses SMTP logs, publishes structured events to Kafka email-logs topic |
| 6 | Fetch Log Worker | Consumes log event from Kafka |
Filters by event type (Sender/Queue/Deferred/Reject) |
||
Skips internal messages (bizfly.vn pattern) |
||
| Inserts log document into MongoDB | ||
| 7 | Client | Queries GET /logs to see delivery status |
| API | Reads from MongoDB, maps raw statuses to display labels |
Shared Infrastructure
| Infrastructure | Who Writes | Who Reads | Shared Config |
|---|---|---|---|
| Kafka (sendmail topic) | API (Python) | Sendmail Worker (Go) | KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC |
| Kafka (email-logs topic) | Filebeat (external) | Fetch Log Worker (Go) | KAFKA_BOOTSTRAP_SERVERS, LOG_TOPIC |
| Redis | API (caches SMTP creds) | Sendmail Worker (decrypts creds) | REDIS_URI, TOKEN_SECRET, TOKEN_IV |
| MongoDB | Fetch Log Worker | API (reads logs) | MONGODB_URI, MONGODB_DBNAME |
| Billing DB (PostgreSQL) | External / Billing Sync | API (read-only queries) | DB_URI |
| USEND DB (PostgreSQL) | API | API | USEND_DB_URI |
API Service (Python)
Tech Stack
- Python 3.10, FastAPI, SQLAlchemy 2.0, Pydantic
- Clean Architecture: Domain -> App -> Adapters -> API
- Command Bus pattern for all use cases
Dual Database Architecture
| Database | Config | Access | Tables |
|---|---|---|---|
| Billing DB | DB_URI |
READ-ONLY | domain, dkim, mkt_domain, users, temporary_password |
| USEND DB | USEND_DB_URI |
Read/Write | api_keys, email_messages, email_transaction_pack, domain_subscription_status, user_pack, domain_tracking_settings, transport |
CRITICAL: Never run migrations or modify schema on the Billing DB. It is managed externally.
Authentication
| Method | Header | Permission |
|---|---|---|
| Bearer Token | Authorization: Bearer <JWT> |
Always full_access |
| API Key | X-API-KEY: <key> |
Depends on key's configured level |
| Cookie | Session cookie | Passthrough |
Permission Hierarchy: read_only < sending_access < full_access (higher includes lower)
API Endpoints Quick Reference
| Group | Method | Path | Auth | Description |
|---|---|---|---|---|
| Emails | POST | /emails |
sending_access | Send email |
| POST | /emails/batch |
sending_access | Send batch emails | |
| GET | /emails |
read_only | List emails (paginated) | |
| GET | /emails/{message_id} |
read_only | Get email details | |
| Logs | GET | /logs |
read_only | Get transaction logs |
| Domains | POST | /domains |
full_access | Create domain |
| GET | /domains |
read_only | List domains | |
| GET | /domains/{domain_id} |
read_only | Get domain details | |
| GET | /domains/{domain_id}/quota |
read_only | Get domain quota | |
| POST | /domains/{domain_id}/verify |
full_access | Verify domain DNS | |
| PATCH | /domains/{domain_id} |
full_access | Update tracking | |
| DELETE | /domains/{domain_id} |
full_access | Delete domain | |
| API Keys | POST | /api-keys |
full_access | Create API key |
| GET | /api-keys |
read_only | List API keys | |
| GET | /api-keys/{key_id} |
read_only | Get API key | |
| DELETE | /api-keys/{key_id} |
full_access | Delete API key | |
| Billing | GET | /billing/subscription/packs |
read_only | List packages |
| GET | /billing/subscription/current-pack |
read_only | Get current pack | |
| POST | /billing/subscription/subs |
full_access | Create subscription | |
| GET | /billing/payg/packages |
read_only | Get PAYG packages | |
| POST | /billing/payg/subscriptions |
full_access | Create PAYG sub | |
| GET | /billing/payg/subscriptions/{domain_id} |
read_only | Get current PAYG sub | |
| DELETE | /billing/payg/subscriptions/{domain_id} |
full_access | Cancel PAYG sub | |
| Senders | POST | /senders |
full_access | Create SMTP sender |
| Testing | POST | /testing/set-pack |
full_access | Set pack for testing |
| Health | GET | /healthz |
none | Health check |
Detailed API Reference
Emails
POST /emails -- Send Email
Permission: sending_access
Request Body:
| Field | Type | Required | Description |
|---|---|---|---|
from |
string |
Yes | "email@domain.com" or "Name <email@domain.com>" |
to |
string[] |
Yes | Recipient email addresses |
subject |
string |
Yes | Email subject |
html |
string |
No* | HTML body (*at least one of html or text required) |
text |
string |
No* | Plain text body |
cc |
string[] |
No | CC recipients |
bcc |
string[] |
No | BCC recipients |
reply_to |
string |
No | Reply-to address |
attachments |
object[] |
No | Attachments (see below) |
headers |
object |
No | Custom email headers |
tags |
object[] |
No | Tags (name/value pairs) |
template_id |
string |
No | Template identifier |
template_data |
object |
No | Template rendering data |
Attachment object:
| Field | Type | Required |
|---|---|---|
filename |
string |
Yes |
content |
string |
Yes (base64-encoded) |
content_type |
string |
Yes |
Response (200):
{
"id": "<message-id>",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"status": "queued",
"created_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (no body, invalid attachment, size > 25MB, quota exceeded, domain not verified) | 401 | 403
POST /emails/batch -- Send Batch Emails
Permission: sending_access
Request Body:
| Field | Type | Required |
|---|---|---|
emails |
SendEmailRequest[] |
Yes (same schema as single send) |
Response (200):
{
"data": [{"id": "<message-id>"}],
"errors": [{"index": 2, "error": "Domain not verified"}]
}
Per-email errors are returned in the errors array, not as HTTP errors.
GET /emails -- List Emails
Permission: read_only
Query Parameters:
| Param | Type | Default | Description |
|---|---|---|---|
page |
int |
1 |
Page number (min: 1) |
per_page |
int |
20 |
Items per page (1-100) |
status |
string |
- | Filter: queued,sent,delivered,bounced,deferred,delayed (comma-separated) |
created_at_from |
string |
- | ISO-8601 UTC lower bound |
created_at_to |
string |
- | ISO-8601 UTC upper bound |
Response (200):
{
"data": [
{
"id": "msg_abc123",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"subject": "Hello",
"status": "Delivered",
"created_at": "2026-03-10T04:16:54Z"
}
],
"total": 42,
"page": 1,
"per_page": 20,
"stats": [
{"label": "Delivered", "count": 30, "percentage": 71.4},
{"label": "Bounced", "count": 12, "percentage": 28.6}
]
}
Notes: Status values in response are display labels (Delivered, Delayed, Bounced, Sent, Queued). Filter accepts both raw (deferred) and display (delayed) names.
Errors: 400 (invalid status, invalid timestamp, from > to, per_page out of range)
GET /emails/{message_id} -- Get Email
Permission: read_only
Response (200):
{
"id": "msg_abc123",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"subject": "Hello",
"html": "<html>...</html>",
"status": "Delivered",
"created_at": "2026-03-10T04:16:54Z",
"delivered_at": "2026-03-10T04:16:58Z"
}
Errors: 404 (not found)
Logs
GET /logs -- Get Transaction Logs
Permission: read_only
Query Parameters:
| Param | Type | Default | Description |
|---|---|---|---|
page |
int |
1 |
Page number |
per_page |
int |
20 |
Items per page (1-100) |
log_before |
string |
- | Filter before timestamp (ISO-8601) |
log_after |
string |
- | Filter after timestamp (ISO-8601) |
action |
string |
- | Filter by raw action (delivered, deferred, rejected) |
sender |
string |
- | Filter by sender email |
recipient |
string |
- | Filter by recipient email |
message_id |
string |
- | Filter by message ID |
source |
string |
- | Filter: api, smtp, or comma-separated |
sorts |
string |
-timestamp |
Sort field (- prefix = descending) |
Response (200):
{
"logs": [
{
"event": "Sender",
"timestamp": "2026-03-10T04:16:54Z",
"status": "Delivered",
"action": "250 2.0.0 OK",
"action_detail": "250 2.0.0 OK 1772679259 xyz",
"sender": "hello@example.com",
"recipient": "user@example.com",
"from_domain": "example.com",
"to_domain": "example.com",
"message_id": "msg_abc123",
"source": "API"
}
],
"total": 150,
"page": 1,
"per_page": 20,
"stats": [
{"label": "Delivered", "count": 100, "percentage": 66.7},
{"label": "Bounced", "count": 30, "percentage": 20.0},
{"label": "API", "count": 80, "percentage": 53.3},
{"label": "SMTP", "count": 70, "percentage": 46.7}
]
}
Notes: status is the mapped display label. action is the short SMTP code. action_detail is the full SMTP response. source is "API" or "SMTP" (computed by cross-referencing message_id with USEND DB).
Errors: 400 (invalid source, per_page out of range)
Domains
POST /domains -- Create Domain
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
domain |
string |
Yes |
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"status": "pending",
"dns_records": [
{"type": "TXT", "name": "@", "value": "v=spf1 include:...", "purpose": "spf"},
{"type": "MX", "name": "@", "value": "capmx-re30...", "priority": 5, "purpose": "mx"},
{"type": "TXT", "name": "dkim._domainkey", "value": "v=DKIM1;...", "purpose": "dkim"}
],
"created_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (duplicate domain, domain limit reached)
GET /domains -- List Domains
Permission: read_only
Query Parameters:
| Param | Type | Default | Description |
|---|---|---|---|
page |
int |
1 |
Page number |
per_page |
int |
20 |
Items per page (1-100) |
search |
string |
"" |
Search domain names |
Response (200):
{
"data": [
{"id": "dom_123", "domain": "example.com", "status": "verified", "created_at": "..."}
],
"total": 3,
"total_verified": 2,
"page": 1,
"per_page": 20
}
GET /domains/{domain_id} -- Get Domain
Permission: read_only | Path: domain_id in dom_123 format
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"status": "verified",
"dns_records": [...],
"verified_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (invalid format) | 404 (not found)
GET /domains/{domain_id}/quota -- Get Domain Quota
Permission: read_only
Response (200):
{
"domain": "example.com",
"daily_limit": 500,
"daily_used": 120,
"daily_remaining": 380,
"hourly_limit": 50,
"hourly_used": 10,
"hourly_remaining": 40,
"monthly_limit": 15500,
"monthly_used": 3600,
"monthly_remaining": 11900,
"pack_code": "DEDICATED_100K",
"pack_name": "Dedicated 100K"
}
POST /domains/{domain_id}/verify -- Verify Domain
Permission: full_access | Request Body: None
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"status": "verified",
"dns_records": [
{"type": "TXT", "name": "@", "value": "...", "purpose": "spf", "verified": true},
{"type": "MX", "name": "@", "value": "...", "purpose": "mx", "verified": true},
{"type": "TXT", "name": "dkim._domainkey", "value": "...", "purpose": "dkim", "verified": false}
],
"verified_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (DNS verification failed) | 404
PATCH /domains/{domain_id} -- Update Domain Tracking
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
click_tracking |
bool |
No |
open_tracking |
bool |
No |
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"click_tracking": true,
"open_tracking": false
}
DELETE /domains/{domain_id} -- Delete Domain
Permission: full_access | Request Body: None
Response (200):
{"id": "dom_123", "deleted": true}
API Keys
POST /api-keys -- Create API Key
Permission: full_access
Request Body:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
Yes | - | Key name |
permission |
string |
No | "full_access" |
full_access, sending_access, or read_only |
expires_in_days |
int |
No | null (never) |
7, 30, 60, 90, 180, or 365 |
Response (200):
{
"id": "key_abc123",
"name": "My API Key",
"key": "etrans_live_abc123def456...",
"permission": "full_access",
"expires_at": "2026-06-10T04:16:54Z",
"created_at": "2026-03-10T04:16:54Z"
}
Important: The
keyfield is shown only once at creation. It cannot be retrieved later.
GET /api-keys -- List API Keys
Permission: read_only
Query Parameters: page (default: 1), per_page (default: 20, max: 100)
Response (200):
{
"data": [
{
"id": "key_abc123",
"name": "My API Key",
"permission": "full_access",
"expires_at": "2026-06-10T04:16:54Z",
"is_expired": false,
"created_at": "2026-03-10T04:16:54Z"
}
],
"total": 5,
"page": 1,
"per_page": 20
}
GET /api-keys/{key_id} -- Get API Key
Permission: read_only
Response (200):
{
"id": "key_abc123",
"name": "My API Key",
"permission": "full_access",
"expires_at": null,
"is_expired": false,
"active": true,
"created_at": "2026-03-10T04:16:54Z",
"updated_at": "2026-03-10T04:16:54Z"
}
DELETE /api-keys/{key_id} -- Delete API Key
Permission: full_access | Request Body: None
Response (200):
{"id": "key_abc123", "deleted": true}
Billing - Subscription
GET /billing/subscription/packs -- List Packages
Permission: read_only
Response (200):
{
"success": true,
"packages": [
{
"pack_code": "DEDICATED_100K",
"pack_type": "dedicated_ip",
"daily_email_limit": 100000,
"price": 500000.0,
"is_active": true
}
]
}
GET /billing/subscription/current-pack -- Get Current Pack
Permission: read_only
Response (200):
{
"pack_code": "DEDICATED_100K",
"pack_name": "Dedicated 100K",
"pack_type": "dedicated_ip",
"daily_email_limit": 100000,
"domain_limit": 5,
"quota": {
"daily_limit": 100000,
"daily_used": 5000,
"daily_remaining": 95000,
"hourly_limit": 4166,
"hourly_used": 200,
"hourly_remaining": 3966,
"monthly_limit": 3100000,
"monthly_used": 150000,
"monthly_remaining": 2950000
},
"domains": [
{"domain_id": "dom_123", "domain_name": "example.com", "status": "verified"}
]
}
POST /billing/subscription/subs -- Create Subscription
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
pack_code |
string |
Yes |
domain_id |
int |
Yes |
Response (200):
{
"success": true,
"message": "Subscription created successfully",
"domain_id": 123,
"pack_code": "DEDICATED_100K"
}
Errors: 400 | 403 (not Owner, insufficient balance) | 404
Billing - Pay As You Go
GET /billing/payg/packages -- Get PAYG Package
Permission: read_only
Response (200):
{
"pack_code": "PAYG_SHARED",
"pack_name": "Pay As You Go",
"pack_type": "shared_ip",
"pricing": {}
}
POST /billing/payg/subscriptions -- Create PAYG Subscription
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
domain_id |
int |
Yes |
Response (200):
{
"success": true,
"message": "PAYG subscription created",
"subscription": {},
"previous_pack": "DEDICATED_100K"
}
GET /billing/payg/subscriptions/{domain_id} -- Get Current PAYG Subscription
Permission: read_only
Response (200):
{"subscription": {}}
Errors: 404 (no PAYG subscription)
DELETE /billing/payg/subscriptions/{domain_id} -- Cancel PAYG Subscription
Permission: full_access
Response (200):
{"success": true, "message": "PAYG subscription cancelled"}
Senders
POST /senders -- Create Sender
Permission: full_access
Request Body:
| Field | Type | Required | Constraints |
|---|---|---|---|
account |
string |
Yes | Email address (e.g., noreply@example.com) |
domain |
string |
Yes | Domain name |
password |
string |
Yes | min 8 characters |
Response (200):
{
"id": "sender_123",
"account": "noreply@example.com",
"domain": "example.com",
"active": true,
"daily_limit": 100,
"hourly_limit": 4,
"created_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (sender already exists)
Testing
POST /testing/set-pack -- Set Pack (Testing Only)
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
pack_code |
string |
Yes |
Response (200):
{
"pack_code": "DEDICATED_100K",
"pack_name": "Dedicated 100K",
"domain_limit": 5,
"daily_email_limit": 100000,
"domains_updated": 3
}
Sets the pack for ALL user domains. Skips billing API and Kafka events. Works even with 0 domains (stores user-level pack assignment).
Errors: 400 (invalid pack_code)
Health Check
GET /healthz
Auth: None
Response (200):
{"status": "ok", "service": "email-transactional"}
Command Bus (19 Use Cases)
| Domain | Commands | External Services |
|---|---|---|
| Email Sending | Send, Get, List | Kafka, Redis, MongoDB |
| Domains | Create, Verify, Get, List, Delete, UpdateTracking, GetQuota | BillingSync |
| Billing | GetPacks, GetCurrentPack, CreateSub, PAYG (4 cmds), SetPack | External Billing API |
| API Keys | Create, List, Get, Delete | Redis (cache eviction) |
| Senders | Create | Redis, Auth, BillingSync |
Business Rules
| Rule | Where | How |
|---|---|---|
| Domain limit | CreateDomainUseCase |
Checks pack.domain_limit via domain subscriptions or user_pack fallback |
| Email quota | SendMailUseCase |
Checks mkt_domain.daily/hourly limits |
| Rate limit tracking | mkt_domain table |
daily_tracking/hourly_tracking counters |
| API key auth | auth.py |
SHA-256 hashed keys, plaintext verified on request |
| SMTP credential cache | send.py -> Redis |
AES-256-CBC encrypted, stored in Redis hash auth |
API Environment Variables
| Category | Variable | Purpose |
|---|---|---|
| Databases | DB_URI |
Billing PostgreSQL (read-only) |
USEND_DB_URI |
Main app PostgreSQL | |
PROD_BILLING_DB_URI |
Prod billing DB (for staging sync) | |
| Kafka | KAFKA_BOOTSTRAP_SERVERS |
Kafka brokers |
KAFKA_TOPIC |
Sendmail topic (default: sendmail) |
|
KAFKA_BILLING_TOPIC |
Billing events topic | |
KAFKA_SASL_* |
SASL authentication | |
| Redis | REDIS_URI |
Redis connection |
| MongoDB | MONGODB_URI / MONGODB_DBNAME / MONGODB_COLLECTION |
Log storage |
| Auth | MANAGE_AUTH_URI |
BizFly auth API |
KEYSTONE_* |
Keystone admin auth | |
| Encryption | TOKEN_SECRET / TOKEN_IV |
AES-256-CBC for SMTP credentials |
| Billing | BILLING_V4_URI |
External billing API |
| DNS | DEFAULT_SPF_VALUE, DEFAULT_MX_* |
DNS record templates |
| Monitoring | SENTRY_DSN |
Error tracking |
APM_* |
Elastic APM (currently disabled) |
Sendmail Worker (Go)
Purpose
Consumes email send requests from Kafka and delivers them via SMTP.
How It Works
See C4 Level 3: Sendmail Worker Components for the full component diagram.
Processing pipeline: Kafka Consumer → Worker Pool (10 goroutines) → Email Service → Redis (fetch creds) → AES Decrypt → SMTP Send
Key behaviors:
- Manual Kafka offset commit with back-pressure (blocks until
task.Done) - 5-minute timeout per email
- 5 retries with exponential backoff (1s, 2s, 4s, 8s, 16s)
- TLS opportunistic mode
Kafka Message Format (Input)
{
"payload": {
"sender": { "name": "Sender Name", "email": "sender@example.com" },
"recipients": ["to@example.com"],
"cc": ["cc@example.com"],
"bcc": ["bcc@example.com"],
"subject": "Subject line",
"body": "<html>Email body</html>",
"message_id": "<unique-message-id>",
"attachments": [
{
"filename": "file.pdf",
"content": "<base64-encoded>",
"content_type": "application/pdf"
}
],
"user_id": "12345"
}
}
Environment Variables
| Variable | Default | Purpose |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
required | Kafka brokers |
KAFKA_TOPIC |
required | Topic to consume (sendmail) |
KAFKA_GROUP_ID |
required | Consumer group ID |
KAFKA_SASL_* |
- | SASL authentication |
REDIS_HOST / REDIS_PORT / REDIS_DB |
required | Redis for SMTP creds |
SMTP_HOST / SMTP_PORT |
required | SMTP server |
SMTP_SECURE |
false |
TLS mode |
TOKEN_SECRET |
required | AES-256 key (32 bytes) |
TOKEN_IV |
required | AES-256 IV (16 bytes) |
WORKER_POOL_SIZE |
10 |
Concurrent goroutines |
Key Files
| File | Purpose |
|---|---|
sendmail_worker/main.go |
Entry point, wiring |
sendmail_worker/config/config.go |
Env var loading + validation |
sendmail_worker/config/crypto.go |
AES-256-CBC decryption |
sendmail_worker/kafka/consumer.go |
Sarama consumer with back-pressure |
sendmail_worker/kafka/message.go |
Message structs |
sendmail_worker/worker/processor.go |
Goroutine pool |
sendmail_worker/email/service.go |
SMTP sending + retry logic |
sendmail_worker/redis/client.go |
Credential fetch from Redis |
Fetch Log Worker (Go)
Purpose
Consumes SMTP delivery log events from Kafka (published by Filebeat) and stores them in MongoDB.
How It Works
See C4 Level 3: Fetch Log Worker Components for the full component diagram.
Processing pipeline: Filebeat → Kafka → Consumer → Worker Pool (10 goroutines) → Event Filter → MongoDB Insert
Key behaviors:
- Manual Kafka offset commit with back-pressure (blocks until
task.Done) - Default offset:
earliest(processes historical events) - Only stores:
Sender,Queue,Deferred,Rejectevents - Skips messages matching
bizfly.vnpattern (internal emails) - Auto-creates MongoDB indexes on startup
- Errors reported to Sentry
Log Event Format (Input from Kafka)
{
"Event": "Sender",
"@timestamp": "2026-02-01T10:30:15.000Z",
"SMTP_Action": "delivered",
"from": "hello@yourdomain.com",
"from_name": "Hello",
"recipient": "user@example.com",
"to_name": "User",
"response": "250 2.0.0 OK",
"from_domain": "yourdomain.com",
"Event_ID": "evt_123",
"message_id": "msg_1234567890",
"to_domain": "example.com",
"agent": {
"id": "agent_1",
"type": "filebeat",
"hostname": "host1",
"name": "agent-name",
"ephemeral_id": "eph_1"
}
}
Event Types
| Event Type | Meaning | Stored? |
|---|---|---|
Sender |
Email accepted by SMTP server | Yes |
Queue |
Email queued for delivery | Yes |
Deferred |
Delivery temporarily failed, will retry | Yes |
Reject |
Email rejected | Yes |
| Others | Various SMTP events | No (filtered out) |
MongoDB Document (Output)
| MongoDB Field | Source |
|---|---|
event |
Event |
timestamp |
@timestamp |
action |
SMTP_Action |
sender |
from |
recipient |
recipient |
response |
response (full SMTP response) |
from_domain |
from_domain |
message_id |
message_id |
to_domain |
to_domain |
event_id |
Event_ID |
agent_* |
Agent metadata (type, name, hostname) |
MongoDB Indexes
| Index Name | Fields | Purpose |
|---|---|---|
idx_from_domain_timestamp |
from_domain ASC, timestamp DESC |
Log queries filtered by domain |
idx_message_id |
message_id |
Single email log lookup |
idx_event_timestamp |
event ASC, timestamp DESC |
Event type filtering |
Environment Variables
| Variable | Default | Purpose |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
required | Kafka brokers |
LOG_TOPIC |
required | Topic to consume (email-logs) |
LOG_GROUP_ID |
required | Consumer group ID |
KAFKA_SASL_* |
- | SASL authentication |
MONGODB_URI |
required | MongoDB connection |
MONGODB_DBNAME |
required | Database name |
MONGODB_COLLECTION |
required | Collection name |
LOG_WORKER_POOL_SIZE |
10 |
Concurrent goroutines |
LOG_PROCESS_TIMEOUT_SECONDS |
10 |
Processing timeout |
LOG_MONGO_INSERT_TIMEOUT_SECONDS |
10 |
Insert timeout |
STORE_EVENT_TYPES |
Sender,Queue,Deferred,Reject |
Which events to store |
SKIP_MESSAGE_ID_PATTERN |
bizfly.vn |
Skip internal messages |
SENTRY_DSN |
- | Error tracking |
Key Files
| File | Purpose |
|---|---|
main.go |
Entry point, wiring |
config/config.go |
Env var loading + filter config |
kafka/consumer.go |
Sarama consumer with back-pressure |
worker/pool.go |
Goroutine pool + event filtering |
events/log_sender.go |
LogEvent struct + MongoDB document conversion |
mongo/client.go |
MongoDB insert + index management |
observability/sentry.go |
Custom Sentry error reporting |
Deployment
Docker Services
| Service | Dockerfile | Exposed Port |
|---|---|---|
email_transaction_api |
./Dockerfile |
8000 |
sendmail_worker |
fetch_log_worker_go/sendmail_worker/Dockerfile |
- |
fetch_log_worker |
fetch_log_worker_go/Dockerfile |
- |
Git Repositories
| Repo | Path | Remote |
|---|---|---|
| API (root) | /email-transactional/ |
git@git.paas.vn:devteam/vce/email-transactional.git |
| Combined Workers | /email-transactional/fetch_log_worker_go/ |
git@git.paas.vn:devteam/vce/etrans-fetch-log-worker.git |
The standalone
sendmail_worker/repo is deprecated. Source of truth isfetch_log_worker_go/sendmail_worker/.
Status Mapping (API Display Labels)
The API maps raw SMTP statuses to user-friendly labels:
| Raw Status (DB/Logs) | Display Label | Source |
|---|---|---|
queued |
Queued | API sets on send |
sender / sent |
Sent | SMTP accepted |
delivered / delivery |
Delivered | SMTP delivered |
deferred |
Delayed | SMTP temporary failure |
bounced / reject / rejected |
Bounced | SMTP permanent failure |
Filters on GET /emails accept both raw and display names (delayed -> deferred).
State Machine Diagrams
Email Lifecycle
stateDiagram-v2
[*] --> Queued: POST /emails
Queued --> Sent: Sendmail Worker picks up<br>from Kafka & SMTP accepts
Queued --> Bounced: Sendmail Worker fails<br>after 5 retries
Sent --> Delivered: SMTP server confirms<br>delivery to recipient
Sent --> Deferred: SMTP temporary failure<br>(will retry)
Sent --> Bounced: SMTP permanent rejection<br>(invalid address, etc.)
Deferred --> Delivered: SMTP retry succeeds
Deferred --> Bounced: SMTP gives up<br>after retries
Deferred --> Deferred: SMTP retries again
Delivered --> [*]
Bounced --> [*]
note right of Queued
API saves to USEND DB
Publishes to Kafka
end note
note right of Sent
Sendmail Worker sends via SMTP
Filebeat captures logs
end note
note right of Deferred
Display label: "Delayed"
end note
Domain Lifecycle
stateDiagram-v2
[*] --> Pending: POST /domains
Pending --> Verified: POST /domains/{id}/verify<br>(all DNS records pass)
Pending --> Pending: POST /domains/{id}/verify<br>(DNS check fails)
Verified --> Deleted: DELETE /domains/{id}
Pending --> Deleted: DELETE /domains/{id}
Deleted --> [*]
state Verified {
[*] --> Active
Active --> TrackingUpdated: PATCH /domains/{id}<br>(click/open tracking)
TrackingUpdated --> Active
}
note right of Pending
DNS records generated:
SPF, MX (x3), DKIM
User must configure DNS
end note
note right of Verified
Domain can send emails
MKTDomain created with
daily/hourly limits
end note
API Key Lifecycle
stateDiagram-v2
[*] --> Active: POST /api-keys<br>(key shown once)
Active --> Revoked: DELETE /api-keys/{id}
Active --> Expired: expires_at reached
Revoked --> [*]
Expired --> [*]
state Active {
[*] --> FullAccess
[*] --> SendingAccess
[*] --> ReadOnly
}
note right of Active
Permission set at creation:
full_access > sending_access > read_only
Expiry: 7/30/60/90/180/365 days or never
end note
Subscription Lifecycle
stateDiagram-v2
[*] --> NoSubscription
NoSubscription --> Dedicated: POST /billing/subscription/subs<br>(pack_code + domain_id)
NoSubscription --> PAYG: POST /billing/payg/subscriptions<br>(domain_id)
Dedicated --> Dedicated: POST /billing/subscription/subs<br>(resize to different pack)
Dedicated --> PAYG: POST /billing/payg/subscriptions<br>(switch to PAYG)
Dedicated --> NoSubscription: Cancel
PAYG --> Dedicated: POST /billing/subscription/subs<br>(switch to dedicated)
PAYG --> NoSubscription: DELETE /billing/payg/subscriptions/{id}
note right of Dedicated
domain_subscription_status.pack_id
points to email_transaction_pack
MKTDomain: daily/hourly limits set
end note
note right of PAYG
pack_code = "PAYG_SHARED"
MKTDomain: daily=-1, hourly=-1
(unlimited)
end note
Sender Lifecycle
stateDiagram-v2
[*] --> Created: POST /senders
state Created {
[*] --> Active
Active: SMTP credentials cached in Redis
Active: MKTDomain created (daily=100, hourly=4)
Active: User created in Billing DB
}
note right of Created
Creates:
1. User record (Billing DB)
2. MKTDomain (rate limits)
3. Temp password
4. SMTP creds in Redis (encrypted)
end note
Email Sending Flow (Detailed State Machine)
stateDiagram-v2
[*] --> ValidateAuth: POST /emails
ValidateAuth --> CheckQuota: Auth OK
ValidateAuth --> Rejected: 401/403
CheckQuota --> CacheCredentials: Quota available
CheckQuota --> Rejected: 400 Quota exceeded
CacheCredentials --> SaveToDB: SMTP creds cached in Redis
SaveToDB --> PublishKafka: EmailMessage saved (queued)
PublishKafka --> ResponseSent: Published to Kafka topic
ResponseSent --> [*]: 200 OK returned to client
state "Sendmail Worker (Async)" as Worker {
[*] --> ConsumeMessage
ConsumeMessage --> FetchCreds: Read from Redis
FetchCreds --> DecryptCreds: AES-256-CBC decrypt
DecryptCreds --> ConnectSMTP
ConnectSMTP --> SendEmail
SendEmail --> Success: 250 OK
SendEmail --> RetryBackoff: Temporary failure
RetryBackoff --> ConnectSMTP: Attempt 2-5
RetryBackoff --> Failed: All 5 attempts failed
}
Rejected --> [*]
note right of Worker
5-minute timeout per email
Exponential backoff: 1s, 2s, 4s, 8s, 16s
end note
Domain Creation with Limit Check
stateDiagram-v2
[*] --> CheckPackLimit: POST /domains
CheckPackLimit --> CheckDomainSub: Has domains with subscription?
CheckPackLimit --> CheckUserPack: No domain subscriptions
CheckDomainSub --> GetDomainLimit: Found pack via domain_subscription_status
CheckUserPack --> GetDomainLimit: Found pack via user_pack table
CheckUserPack --> Unlimited: No user_pack found
GetDomainLimit --> Unlimited: domain_limit = 0
GetDomainLimit --> CountDomains: domain_limit > 0
CountDomains --> CreateAllowed: count < limit
CountDomains --> LimitReached: count >= limit
Unlimited --> CheckDuplicate
CreateAllowed --> CheckDuplicate
CheckDuplicate --> GenerateDKIM: Domain is unique
CheckDuplicate --> Rejected: 400 Domain exists
GenerateDKIM --> SaveDomain: DKIM keys generated
SaveDomain --> SyncProd: Saved to Billing DB
SyncProd --> ReturnDNS: Synced to production
ReturnDNS --> [*]: 200 OK (DNS records)
LimitReached --> [*]: 400 Domain limit reached
Rejected --> [*]