Data ingestion — bRRAIn Docs
Record structure, single and bulk storage, metadata and context, error handling, and batch patterns.
Data ingestion
Record structure
A record is any map (Go) or object (other SDKs) conforming to these conventions:
| Field | Required | Notes |
| --- | --- | --- |
| type | Yes | A string identifying the record's domain type (e.g., article, invoice, telemetry) |
| Other fields | No | Any schema-agnostic payload; bRRAIn indexes all strings and numbers |
Optional metadata is attached via sdk.WithContext(...):
ws.Store(record, sdk.WithContext(map[string]string{
"source": "crm",
"customer_id": "c-12345",
"classification": "internal",
}))
Storing records
Single
id, err := ws.Store(record)
Returns a record ID on success. Errors are typed — inspect sdk.ErrValidation, sdk.ErrRateLimit, sdk.ErrGatewayBlocked.
Bulk
ids, err := ws.StoreBatch(records,
sdk.WithBatchSize(500),
sdk.WithConcurrency(8),
)
The SDK automatically chunks requests and streams them over HTTP/2. Partial failures return an errorsByIndex map so you can retry only the failures.
Streaming
For high-throughput telemetry (IoT, logs):
stream, _ := ws.StoreStream(ctx)
defer stream.Close()
for event := range events {
if err := stream.Send(event); err != nil {
log.Printf("send error: %v", err)
}
}
Streams back-pressure when the Handler is saturated — your producer naturally slows rather than dropping records.
Context and metadata
Context metadata is not indexed by default — it's preserved verbatim for provenance. To make a context field searchable, list it in the workspace's indexed_context setting:
brrain workspace update legal-team --indexed-context=case_id,jurisdiction
Error handling and retries
The SDK retries transient failures automatically (network timeouts, 502/503/504, rate-limit backoff). You can customize:
client := sdk.NewClient(apiKey,
sdk.WithRetry(sdk.RetryPolicy{
MaxAttempts: 5,
Backoff: sdk.BackoffExponential(100 * time.Millisecond, 2*time.Second),
}),
)
Typed errors you should handle in application code:
sdk.ErrValidation— the record payload failed schema checkssdk.ErrRateLimit— slow down or upgrade tiersdk.ErrGatewayBlocked— the Gate 1 / Gate 2 policy engine rejected the contentsdk.ErrAuthExpired— refresh credentials
Batch ingestion patterns
Pattern A — ETL from a relational DB
rows, _ := db.Query("SELECT id, data FROM records WHERE created_at > $1", lastSync)
defer rows.Close()
batch := make([]map[string]any, 0, 500)
for rows.Next() {
var id int
var data []byte
rows.Scan(&id, &data)
var record map[string]any
json.Unmarshal(data, &record)
batch = append(batch, record)
if len(batch) == 500 {
ws.StoreBatch(batch)
batch = batch[:0]
}
}
if len(batch) > 0 {
ws.StoreBatch(batch)
}
Pattern B — event-driven ingestion (Kafka)
for {
msg, _ := reader.ReadMessage(ctx)
ws.Store(msg.Value, sdk.WithContext(map[string]string{
"topic": msg.Topic,
"offset": strconv.FormatInt(msg.Offset, 10),
}))
}
Pattern C — incremental sync (CDC)
Subscribe to PostgreSQL LISTEN/NOTIFY or MongoDB change streams and mirror writes directly. See Handler adapters for production-ready adapters.
Data transformation
If you need to transform records before ingestion, register a transformer:
client.Transformers.Register("redact-pii", func(r map[string]any) map[string]any {
if email, ok := r["email"].(string); ok {
r["email_hash"] = sha256sum(email)
delete(r, "email")
}
return r
})
ws.Store(record, sdk.WithTransformers("redact-pii"))
Transformers run in-process before the record reaches the Handler, so PII never leaves your application boundary.