Email Transactional - System Overview
Architecture documentation following the C4 model — zoom from high-level context down to component details.
C4 Level 1: System Context Diagram
Who uses the system and what external systems does it depend on?
C4 Level 2: Container Diagram
What are the running applications, data stores, and how do they communicate?
C4 Level 3: Component Diagrams
API Server Components
What are the internal modules of the Python API?
Sendmail Worker Components
Fetch Log Worker Components
Data Flow: Send Email (Sequence)
Three Services
| Service | Language | Port | Role |
|---|---|---|---|
| API Server | Python 3.10 / FastAPI | 8000 | HTTP endpoints, business logic, publishes to Kafka |
| Sendmail Worker | Go 1.24 | - | Consumes email requests from Kafka, delivers via SMTP |
| Fetch Log Worker | Go 1.24 | - | Consumes SMTP delivery logs from Kafka, stores in MongoDB |
End-to-End Flow: Sending an Email
| Step | Component | Actions |
|---|---|---|
| 1 | Client | Sends POST /emails with email payload |
| 2 | API | Validates auth (Bearer/API key) |
Checks quotas (daily/hourly from mkt_domain in Billing DB) | ||
| Caches SMTP credentials to Redis (AES-256-CBC encrypted) | ||
Saves EmailMessage record to USEND DB (status: queued) | ||
Publishes email payload to Kafka sendmail topic | ||
| 3 | Sendmail Worker | Consumes message from Kafka |
Fetches SMTP credentials from Redis (hash auth, key=user_id) | ||
| Decrypts credentials with AES-256-CBC | ||
| Sends email via SMTP (go-mail library, 5 retries, exponential backoff) | ||
| 4 | SMTP Server | Delivers the email, writes delivery logs to disk |
| 5 | Filebeat | Parses SMTP logs, publishes structured events to Kafka email-logs topic |
| 6 | Fetch Log Worker | Consumes log event from Kafka |
Filters by event type (Sender/Queue/Deferred/Reject) | ||
Skips internal messages (bizfly.vn pattern) | ||
| Inserts log document into MongoDB | ||
| 7 | Client | Queries GET /logs to see delivery status |
| API | Reads from MongoDB, maps raw statuses to display labels |
Shared Infrastructure
| Infrastructure | Who Writes | Who Reads | Shared Config |
|---|---|---|---|
| Kafka (sendmail topic) | API (Python) | Sendmail Worker (Go) | KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC |
| Kafka (email-logs topic) | Filebeat (external) | Fetch Log Worker (Go) | KAFKA_BOOTSTRAP_SERVERS, LOG_TOPIC |
| Redis | API (caches SMTP creds) | Sendmail Worker (decrypts creds) | REDIS_URI, TOKEN_SECRET, TOKEN_IV |
| MongoDB | Fetch Log Worker | API (reads logs) | MONGODB_URI, MONGODB_DBNAME |
| Billing DB (PostgreSQL) | External / Billing Sync | API (read-only queries) | DB_URI |
| USEND DB (PostgreSQL) | API | API | USEND_DB_URI |
API Service (Python)
Tech Stack
- Python 3.10, FastAPI, SQLAlchemy 2.0, Pydantic
- Clean Architecture: Domain -> App -> Adapters -> API
- Command Bus pattern for all use cases
Dual Database Architecture
| Database | Config | Access | Tables |
|---|---|---|---|
| Billing DB | DB_URI | READ-ONLY | domain, dkim, mkt_domain, users, temporary_password |
| USEND DB | USEND_DB_URI | Read/Write | api_keys, email_messages, email_transaction_pack, domain_subscription_status, user_pack, domain_tracking_settings, transport |
CRITICAL: Never run migrations or modify schema on the Billing DB. It is managed externally.
Authentication
| Method | Header | Permission |
|---|---|---|
| Bearer Token | Authorization: Bearer <JWT> | Always full_access |
| API Key | X-API-KEY: <key> | Depends on key's configured level |
| Cookie | Session cookie | Passthrough |
Permission Hierarchy: read_only < sending_access < full_access (higher includes lower)
API Endpoints Quick Reference
| Group | Method | Path | Auth | Description |
|---|---|---|---|---|
| Emails | POST | /emails | sending_access | Send email |
| POST | /emails/batch | sending_access | Send batch emails | |
| GET | /emails | read_only | List emails (paginated) | |
| GET | /emails/{message_id} | read_only | Get email details | |
| Logs | GET | /logs | read_only | Get transaction logs |
| Domains | POST | /domains | full_access | Create domain |
| GET | /domains | read_only | List domains | |
| GET | /domains/{domain_id} | read_only | Get domain details | |
| GET | /domains/{domain_id}/quota | read_only | Get domain quota | |
| POST | /domains/{domain_id}/verify | full_access | Verify domain DNS | |
| PATCH | /domains/{domain_id} | full_access | Update tracking | |
| DELETE | /domains/{domain_id} | full_access | Delete domain | |
| API Keys | POST | /api-keys | full_access | Create API key |
| GET | /api-keys | read_only | List API keys | |
| GET | /api-keys/{key_id} | read_only | Get API key | |
| DELETE | /api-keys/{key_id} | full_access | Delete API key | |
| Billing | GET | /billing/subscription/packs | read_only | List packages |
| GET | /billing/subscription/current-pack | read_only | Get current pack | |
| POST | /billing/subscription/subs | full_access | Create subscription | |
| GET | /billing/payg/packages | read_only | Get PAYG packages | |
| POST | /billing/payg/subscriptions | full_access | Create PAYG sub | |
| GET | /billing/payg/subscriptions/{domain_id} | read_only | Get current PAYG sub | |
| DELETE | /billing/payg/subscriptions/{domain_id} | full_access | Cancel PAYG sub | |
| Senders | POST | /senders | full_access | Create SMTP sender |
| Testing | POST | /testing/set-pack | full_access | Set pack for testing |
| Health | GET | /healthz | none | Health check |
Detailed API Reference
Emails
POST /emails -- Send Email
Permission: sending_access
Request Body:
| Field | Type | Required | Description |
|---|---|---|---|
from | string | Yes | "email@domain.com" or "Name <email@domain.com>" |
to | string[] | Yes | Recipient email addresses |
subject | string | Yes | Email subject |
html | string | No* | HTML body (*at least one of html or text required) |
text | string | No* | Plain text body |
cc | string[] | No | CC recipients |
bcc | string[] | No | BCC recipients |
reply_to | string | No | Reply-to address |
attachments | object[] | No | Attachments (see below) |
headers | object | No | Custom email headers |
tags | object[] | No | Tags (name/value pairs) |
template_id | string | No | Template identifier |
template_data | object | No | Template rendering data |
Attachment object:
| Field | Type | Required |
|---|---|---|
filename | string | Yes |
content | string | Yes (base64-encoded) |
content_type | string | Yes |
Response (200):
{
"id": "<message-id>",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"status": "queued",
"created_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (no body, invalid attachment, size > 25MB, quota exceeded, domain not verified) | 401 | 403
POST /emails/batch -- Send Batch Emails
Permission: sending_access
Request Body:
| Field | Type | Required |
|---|---|---|
emails | SendEmailRequest[] | Yes (same schema as single send) |
Response (200):
{
"data": [{"id": "<message-id>"}],
"errors": [{"index": 2, "error": "Domain not verified"}]
}
Per-email errors are returned in the errors array, not as HTTP errors.
GET /emails -- List Emails
Permission: read_only
Query Parameters:
| Param | Type | Default | Description |
|---|---|---|---|
page | int | 1 | Page number (min: 1) |
per_page | int | 20 | Items per page (1-100) |
status | string | - | Filter: queued,sent,delivered,bounced,deferred,delayed (comma-separated) |
created_at_from | string | - | ISO-8601 UTC lower bound |
created_at_to | string | - | ISO-8601 UTC upper bound |
Response (200):
{
"data": [
{
"id": "msg_abc123",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"subject": "Hello",
"status": "Delivered",
"created_at": "2026-03-10T04:16:54Z"
}
],
"total": 42,
"page": 1,
"per_page": 20,
"stats": [
{"label": "Delivered", "count": 30, "percentage": 71.4},
{"label": "Bounced", "count": 12, "percentage": 28.6}
]
}
Notes: Status values in response are display labels (Delivered, Delayed, Bounced, Sent, Queued). Filter accepts both raw (deferred) and display (delayed) names.
Errors: 400 (invalid status, invalid timestamp, from > to, per_page out of range)
GET /emails/{message_id} -- Get Email
Permission: read_only
Response (200):
{
"id": "msg_abc123",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"subject": "Hello",
"html": "<html>...</html>",
"status": "Delivered",
"created_at": "2026-03-10T04:16:54Z",
"delivered_at": "2026-03-10T04:16:58Z"
}
Errors: 404 (not found)
Logs
GET /logs -- Get Transaction Logs
Permission: read_only
Query Parameters:
| Param | Type | Default | Description |
|---|---|---|---|
page | int | 1 | Page number |
per_page | int | 20 | Items per page (1-100) |
log_before | string | - | Filter before timestamp (ISO-8601) |
log_after | string | - | Filter after timestamp (ISO-8601) |
action | string | - | Filter by raw action (delivered, deferred, rejected) |
sender | string | - | Filter by sender email |
recipient | string | - | Filter by recipient email |
message_id | string | - | Filter by message ID |
source | string | - | Filter: api, smtp, or comma-separated |
sorts | string | -timestamp | Sort field (- prefix = descending) |
Response (200):
{
"logs": [
{
"event": "Sender",
"timestamp": "2026-03-10T04:16:54Z",
"status": "Delivered",
"action": "250 2.0.0 OK",
"action_detail": "250 2.0.0 OK 1772679259 xyz",
"sender": "hello@example.com",
"recipient": "user@example.com",
"from_domain": "example.com",
"to_domain": "example.com",
"message_id": "msg_abc123",
"source": "API"
}
],
"total": 150,
"page": 1,
"per_page": 20,
"stats": [
{"label": "Delivered", "count": 100, "percentage": 66.7},
{"label": "Bounced", "count": 30, "percentage": 20.0},
{"label": "API", "count": 80, "percentage": 53.3},
{"label": "SMTP", "count": 70, "percentage": 46.7}
]
}
Notes: status is the mapped display label. action is the short SMTP code. action_detail is the full SMTP response. source is "API" or "SMTP" (computed by cross-referencing message_id with USEND DB).
Errors: 400 (invalid source, per_page out of range)
Domains
POST /domains -- Create Domain
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
domain | string | Yes |
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"status": "pending",
"dns_records": [
{"type": "TXT", "name": "@", "value": "v=spf1 include:...", "purpose": "spf"},
{"type": "MX", "name": "@", "value": "capmx-re30...", "priority": 5, "purpose": "mx"},
{"type": "TXT", "name": "dkim._domainkey", "value": "v=DKIM1;...", "purpose": "dkim"}
],
"created_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (duplicate domain, domain limit reached)
GET /domains -- List Domains
Permission: read_only
Query Parameters:
| Param | Type | Default | Description |
|---|---|---|---|
page | int | 1 | Page number |
per_page | int | 20 | Items per page (1-100) |
search | string | "" | Search domain names |
Response (200):
{
"data": [
{"id": "dom_123", "domain": "example.com", "status": "verified", "created_at": "..."}
],
"total": 3,
"total_verified": 2,
"page": 1,
"per_page": 20
}
GET /domains/{domain_id} -- Get Domain
Permission: read_only | Path: domain_id in dom_123 format
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"status": "verified",
"dns_records": [...],
"verified_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (invalid format) | 404 (not found)
GET /domains/{domain_id}/quota -- Get Domain Quota
Permission: read_only
Response (200):
{
"domain": "example.com",
"daily_limit": 500,
"daily_used": 120,
"daily_remaining": 380,
"hourly_limit": 50,
"hourly_used": 10,
"hourly_remaining": 40,
"monthly_limit": 15500,
"monthly_used": 3600,
"monthly_remaining": 11900,
"pack_code": "DEDICATED_100K",
"pack_name": "Dedicated 100K"
}
POST /domains/{domain_id}/verify -- Verify Domain
Permission: full_access | Request Body: None
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"status": "verified",
"dns_records": [
{"type": "TXT", "name": "@", "value": "...", "purpose": "spf", "verified": true},
{"type": "MX", "name": "@", "value": "...", "purpose": "mx", "verified": true},
{"type": "TXT", "name": "dkim._domainkey", "value": "...", "purpose": "dkim", "verified": false}
],
"verified_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (DNS verification failed) | 404
PATCH /domains/{domain_id} -- Update Domain Tracking
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
click_tracking | bool | No |
open_tracking | bool | No |
Response (200):
{
"id": "dom_123",
"domain": "example.com",
"click_tracking": true,
"open_tracking": false
}
DELETE /domains/{domain_id} -- Delete Domain
Permission: full_access | Request Body: None
Response (200):
{"id": "dom_123", "deleted": true}
API Keys
POST /api-keys -- Create API Key
Permission: full_access
Request Body:
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
name | string | Yes | - | Key name |
permission | string | No | "full_access" | full_access, sending_access, or read_only |
expires_in_days | int | No | null (never) | 7, 30, 60, 90, 180, or 365 |
Response (200):
{
"id": "key_abc123",
"name": "My API Key",
"key": "etrans_live_abc123def456...",
"permission": "full_access",
"expires_at": "2026-06-10T04:16:54Z",
"created_at": "2026-03-10T04:16:54Z"
}
Important: The
keyfield is shown only once at creation. It cannot be retrieved later.
GET /api-keys -- List API Keys
Permission: read_only
Query Parameters: page (default: 1), per_page (default: 20, max: 100)
Response (200):
{
"data": [
{
"id": "key_abc123",
"name": "My API Key",
"permission": "full_access",
"expires_at": "2026-06-10T04:16:54Z",
"is_expired": false,
"created_at": "2026-03-10T04:16:54Z"
}
],
"total": 5,
"page": 1,
"per_page": 20
}
GET /api-keys/{key_id} -- Get API Key
Permission: read_only
Response (200):
{
"id": "key_abc123",
"name": "My API Key",
"permission": "full_access",
"expires_at": null,
"is_expired": false,
"active": true,
"created_at": "2026-03-10T04:16:54Z",
"updated_at": "2026-03-10T04:16:54Z"
}
DELETE /api-keys/{key_id} -- Delete API Key
Permission: full_access | Request Body: None
Response (200):
{"id": "key_abc123", "deleted": true}
Billing - Subscription
GET /billing/subscription/packs -- List Packages
Permission: read_only
Response (200):
{
"success": true,
"packages": [
{
"pack_code": "DEDICATED_100K",
"pack_type": "dedicated_ip",
"daily_email_limit": 100000,
"price": 500000.0,
"is_active": true
}
]
}
GET /billing/subscription/current-pack -- Get Current Pack
Permission: read_only
Response (200):
{
"pack_code": "DEDICATED_100K",
"pack_name": "Dedicated 100K",
"pack_type": "dedicated_ip",
"daily_email_limit": 100000,
"domain_limit": 5,
"quota": {
"daily_limit": 100000,
"daily_used": 5000,
"daily_remaining": 95000,
"hourly_limit": 4166,
"hourly_used": 200,
"hourly_remaining": 3966,
"monthly_limit": 3100000,
"monthly_used": 150000,
"monthly_remaining": 2950000
},
"domains": [
{"domain_id": "dom_123", "domain_name": "example.com", "status": "verified"}
]
}
POST /billing/subscription/subs -- Create Subscription
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
pack_code | string | Yes |
domain_id | int | Yes |
Response (200):
{
"success": true,
"message": "Subscription created successfully",
"domain_id": 123,
"pack_code": "DEDICATED_100K"
}
Errors: 400 | 403 (not Owner, insufficient balance) | 404
Billing - Pay As You Go
GET /billing/payg/packages -- Get PAYG Package
Permission: read_only
Response (200):
{
"pack_code": "PAYG_SHARED",
"pack_name": "Pay As You Go",
"pack_type": "shared_ip",
"pricing": {}
}
POST /billing/payg/subscriptions -- Create PAYG Subscription
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
domain_id | int | Yes |
Response (200):
{
"success": true,
"message": "PAYG subscription created",
"subscription": {},
"previous_pack": "DEDICATED_100K"
}
GET /billing/payg/subscriptions/{domain_id} -- Get Current PAYG Subscription
Permission: read_only
Response (200):
{"subscription": {}}
Errors: 404 (no PAYG subscription)
DELETE /billing/payg/subscriptions/{domain_id} -- Cancel PAYG Subscription
Permission: full_access
Response (200):
{"success": true, "message": "PAYG subscription cancelled"}
Senders
POST /senders -- Create Sender
Permission: full_access
Request Body:
| Field | Type | Required | Constraints |
|---|---|---|---|
account | string | Yes | Email address (e.g., noreply@example.com) |
domain | string | Yes | Domain name |
password | string | Yes | min 8 characters |
Response (200):
{
"id": "sender_123",
"account": "noreply@example.com",
"domain": "example.com",
"active": true,
"daily_limit": 100,
"hourly_limit": 4,
"created_at": "2026-03-10T04:16:54Z"
}
Errors: 400 (sender already exists)
Testing
POST /testing/set-pack -- Set Pack (Testing Only)
Permission: full_access
Request Body:
| Field | Type | Required |
|---|---|---|
pack_code | string | Yes |
Response (200):
{
"pack_code": "DEDICATED_100K",
"pack_name": "Dedicated 100K",
"domain_limit": 5,
"daily_email_limit": 100000,
"domains_updated": 3
}
Sets the pack for ALL user domains. Skips billing API and Kafka events. Works even with 0 domains (stores user-level pack assignment).
Errors: 400 (invalid pack_code)
Health Check
GET /healthz
Auth: None
Response (200):
{"status": "ok", "service": "email-transactional"}
Command Bus (19 Use Cases)
| Domain | Commands | External Services |
|---|---|---|
| Email Sending | Send, Get, List | Kafka, Redis, MongoDB |
| Domains | Create, Verify, Get, List, Delete, UpdateTracking, GetQuota | BillingSync |
| Billing | GetPacks, GetCurrentPack, CreateSub, PAYG (4 cmds), SetPack | External Billing API |
| API Keys | Create, List, Get, Delete | Redis (cache eviction) |
| Senders | Create | Redis, Auth, BillingSync |
Business Rules
| Rule | Where | How |
|---|---|---|
| Domain limit | CreateDomainUseCase | Checks pack.domain_limit via domain subscriptions or user_pack fallback |
| Email quota | SendMailUseCase | Checks mkt_domain.daily/hourly limits |
| Rate limit tracking | mkt_domain table | daily_tracking/hourly_tracking counters |
| API key auth | auth.py | SHA-256 hashed keys, plaintext verified on request |
| SMTP credential cache | send.py -> Redis | AES-256-CBC encrypted, stored in Redis hash auth |
API Environment Variables
| Category | Variable | Purpose |
|---|---|---|
| Databases | DB_URI | Billing PostgreSQL (read-only) |
USEND_DB_URI | Main app PostgreSQL | |
PROD_BILLING_DB_URI | Prod billing DB (for staging sync) | |
| Kafka | KAFKA_BOOTSTRAP_SERVERS | Kafka brokers |
KAFKA_TOPIC | Sendmail topic (default: sendmail) | |
KAFKA_BILLING_TOPIC | Billing events topic | |
KAFKA_SASL_* | SASL authentication | |
| Redis | REDIS_URI | Redis connection |
| MongoDB | MONGODB_URI / MONGODB_DBNAME / MONGODB_COLLECTION | Log storage |
| Auth | MANAGE_AUTH_URI | BizFly auth API |
KEYSTONE_* | Keystone admin auth | |
| Encryption | TOKEN_SECRET / TOKEN_IV | AES-256-CBC for SMTP credentials |
| Billing | BILLING_V4_URI | External billing API |
| DNS | DEFAULT_SPF_VALUE, DEFAULT_MX_* | DNS record templates |
| Monitoring | SENTRY_DSN | Error tracking |
APM_* | Elastic APM (currently disabled) |
Sendmail Worker (Go)
Purpose
Consumes email send requests from Kafka and delivers them via SMTP.
How It Works
See C4 Level 3: Sendmail Worker Components for the full component diagram.
Processing pipeline: Kafka Consumer → Worker Pool (10 goroutines) → Email Service → Redis (fetch creds) → AES Decrypt → SMTP Send
Key behaviors:
- Manual Kafka offset commit with back-pressure (blocks until
task.Done) - 5-minute timeout per email
- 5 retries with exponential backoff (1s, 2s, 4s, 8s, 16s)
- TLS opportunistic mode
Kafka Message Format (Input)
{
"payload": {
"sender": { "name": "Sender Name", "email": "sender@example.com" },
"recipients": ["to@example.com"],
"cc": ["cc@example.com"],
"bcc": ["bcc@example.com"],
"subject": "Subject line",
"body": "<html>Email body</html>",
"message_id": "<unique-message-id>",
"attachments": [
{
"filename": "file.pdf",
"content": "<base64-encoded>",
"content_type": "application/pdf"
}
],
"user_id": "12345"
}
}
Environment Variables
| Variable | Default | Purpose |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | required | Kafka brokers |
KAFKA_TOPIC | required | Topic to consume (sendmail) |
KAFKA_GROUP_ID | required | Consumer group ID |
KAFKA_SASL_* | - | SASL authentication |
REDIS_HOST / REDIS_PORT / REDIS_DB | required | Redis for SMTP creds |
SMTP_HOST / SMTP_PORT | required | SMTP server |
SMTP_SECURE | false | TLS mode |
TOKEN_SECRET | required | AES-256 key (32 bytes) |
TOKEN_IV | required | AES-256 IV (16 bytes) |
WORKER_POOL_SIZE | 10 | Concurrent goroutines |
Key Files
| File | Purpose |
|---|---|
sendmail_worker/main.go | Entry point, wiring |
sendmail_worker/config/config.go | Env var loading + validation |
sendmail_worker/config/crypto.go | AES-256-CBC decryption |
sendmail_worker/kafka/consumer.go | Sarama consumer with back-pressure |
sendmail_worker/kafka/message.go | Message structs |
sendmail_worker/worker/processor.go | Goroutine pool |
sendmail_worker/email/service.go | SMTP sending + retry logic |
sendmail_worker/redis/client.go | Credential fetch from Redis |
Fetch Log Worker (Go)
Purpose
Consumes SMTP delivery log events from Kafka (published by Filebeat) and stores them in MongoDB.
How It Works
See C4 Level 3: Fetch Log Worker Components for the full component diagram.
Processing pipeline: Filebeat → Kafka → Consumer → Worker Pool (10 goroutines) → Event Filter → MongoDB Insert
Key behaviors:
- Manual Kafka offset commit with back-pressure (blocks until
task.Done) - Default offset:
earliest(processes historical events) - Only stores:
Sender,Queue,Deferred,Rejectevents - Skips messages matching
bizfly.vnpattern (internal emails) - Auto-creates MongoDB indexes on startup
- Errors reported to Sentry
Log Event Format (Input from Kafka)
{
"Event": "Sender",
"@timestamp": "2026-02-01T10:30:15.000Z",
"SMTP_Action": "delivered",
"from": "hello@yourdomain.com",
"from_name": "Hello",
"recipient": "user@example.com",
"to_name": "User",
"response": "250 2.0.0 OK",
"from_domain": "yourdomain.com",
"Event_ID": "evt_123",
"message_id": "msg_1234567890",
"to_domain": "example.com",
"agent": {
"id": "agent_1",
"type": "filebeat",
"hostname": "host1",
"name": "agent-name",
"ephemeral_id": "eph_1"
}
}
Event Types
| Event Type | Meaning | Stored? |
|---|---|---|
Sender | Email accepted by SMTP server | Yes |
Queue | Email queued for delivery | Yes |
Deferred | Delivery temporarily failed, will retry | Yes |
Reject | Email rejected | Yes |
| Others | Various SMTP events | No (filtered out) |
MongoDB Document (Output)
| MongoDB Field | Source |
|---|---|
event | Event |
timestamp | @timestamp |
action | SMTP_Action |
sender | from |
recipient | recipient |
response | response (full SMTP response) |
from_domain | from_domain |
message_id | message_id |
to_domain | to_domain |
event_id | Event_ID |
agent_* | Agent metadata (type, name, hostname) |
MongoDB Indexes
| Index Name | Fields | Purpose |
|---|---|---|
idx_from_domain_timestamp | from_domain ASC, timestamp DESC | Log queries filtered by domain |
idx_message_id | message_id | Single email log lookup |
idx_event_timestamp | event ASC, timestamp DESC | Event type filtering |
Environment Variables
| Variable | Default | Purpose |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | required | Kafka brokers |
LOG_TOPIC | required | Topic to consume (email-logs) |
LOG_GROUP_ID | required | Consumer group ID |
KAFKA_SASL_* | - | SASL authentication |
MONGODB_URI | required | MongoDB connection |
MONGODB_DBNAME | required | Database name |
MONGODB_COLLECTION | required | Collection name |
LOG_WORKER_POOL_SIZE | 10 | Concurrent goroutines |
LOG_PROCESS_TIMEOUT_SECONDS | 10 | Processing timeout |
LOG_MONGO_INSERT_TIMEOUT_SECONDS | 10 | Insert timeout |
STORE_EVENT_TYPES | Sender,Queue,Deferred,Reject | Which events to store |
SKIP_MESSAGE_ID_PATTERN | bizfly.vn | Skip internal messages |
SENTRY_DSN | - | Error tracking |
Key Files
| File | Purpose |
|---|---|
main.go | Entry point, wiring |
config/config.go | Env var loading + filter config |
kafka/consumer.go | Sarama consumer with back-pressure |
worker/pool.go | Goroutine pool + event filtering |
events/log_sender.go | LogEvent struct + MongoDB document conversion |
mongo/client.go | MongoDB insert + index management |
observability/sentry.go | Custom Sentry error reporting |
Deployment
Docker Services
| Service | Dockerfile | Exposed Port |
|---|---|---|
email_transaction_api | ./Dockerfile | 8000 |
sendmail_worker | fetch_log_worker_go/sendmail_worker/Dockerfile | - |
fetch_log_worker | fetch_log_worker_go/Dockerfile | - |
Git Repositories
| Repo | Path | Remote |
|---|---|---|
| API (root) | /email-transactional/ | git@git.paas.vn:devteam/vce/email-transactional.git |
| Combined Workers | /email-transactional/fetch_log_worker_go/ | git@git.paas.vn:devteam/vce/etrans-fetch-log-worker.git |
The standalone
sendmail_worker/repo is deprecated. Source of truth isfetch_log_worker_go/sendmail_worker/.
Status Mapping (API Display Labels)
The API maps raw SMTP statuses to user-friendly labels:
| Raw Status (DB/Logs) | Display Label | Source |
|---|---|---|
queued | Queued | API sets on send |
sender / sent | Sent | SMTP accepted |
delivered / delivery | Delivered | SMTP delivered |
deferred | Delayed | SMTP temporary failure |
bounced / reject / rejected | Bounced | SMTP permanent failure |
Filters on GET /emails accept both raw and display names (delayed -> deferred).