Data ingestion — bRRAIn Docs

Record structure, single and bulk storage, metadata and context, error handling, and batch patterns.

Data ingestion

Record structure

A record is any map (Go) or object (other SDKs) conforming to these conventions:

| Field | Required | Notes | | --- | --- | --- | | type | Yes | A string identifying the record's domain type (e.g., article, invoice, telemetry) | | Other fields | No | Any schema-agnostic payload; bRRAIn indexes all strings and numbers |

Optional metadata is attached via sdk.WithContext(...):

ws.Store(record, sdk.WithContext(map[string]string{
    "source":       "crm",
    "customer_id":  "c-12345",
    "classification": "internal",
}))

Storing records

Single

id, err := ws.Store(record)

Returns a record ID on success. Errors are typed — inspect sdk.ErrValidation, sdk.ErrRateLimit, sdk.ErrGatewayBlocked.

Bulk

ids, err := ws.StoreBatch(records,
    sdk.WithBatchSize(500),
    sdk.WithConcurrency(8),
)

The SDK automatically chunks requests and streams them over HTTP/2. Partial failures return an errorsByIndex map so you can retry only the failures.

Streaming

For high-throughput telemetry (IoT, logs):

stream, _ := ws.StoreStream(ctx)
defer stream.Close()

for event := range events {
    if err := stream.Send(event); err != nil {
        log.Printf("send error: %v", err)
    }
}

Streams back-pressure when the Handler is saturated — your producer naturally slows rather than dropping records.

Context and metadata

Context metadata is not indexed by default — it's preserved verbatim for provenance. To make a context field searchable, list it in the workspace's indexed_context setting:

brrain workspace update legal-team --indexed-context=case_id,jurisdiction

Error handling and retries

The SDK retries transient failures automatically (network timeouts, 502/503/504, rate-limit backoff). You can customize:

client := sdk.NewClient(apiKey,
    sdk.WithRetry(sdk.RetryPolicy{
        MaxAttempts: 5,
        Backoff:     sdk.BackoffExponential(100 * time.Millisecond, 2*time.Second),
    }),
)

Typed errors you should handle in application code:

  • sdk.ErrValidation — the record payload failed schema checks
  • sdk.ErrRateLimit — slow down or upgrade tier
  • sdk.ErrGatewayBlocked — the Gate 1 / Gate 2 policy engine rejected the content
  • sdk.ErrAuthExpired — refresh credentials

Batch ingestion patterns

Pattern A — ETL from a relational DB

rows, _ := db.Query("SELECT id, data FROM records WHERE created_at > $1", lastSync)
defer rows.Close()

batch := make([]map[string]any, 0, 500)
for rows.Next() {
    var id int
    var data []byte
    rows.Scan(&id, &data)
    var record map[string]any
    json.Unmarshal(data, &record)
    batch = append(batch, record)
    if len(batch) == 500 {
        ws.StoreBatch(batch)
        batch = batch[:0]
    }
}
if len(batch) > 0 {
    ws.StoreBatch(batch)
}

Pattern B — event-driven ingestion (Kafka)

for {
    msg, _ := reader.ReadMessage(ctx)
    ws.Store(msg.Value, sdk.WithContext(map[string]string{
        "topic":  msg.Topic,
        "offset": strconv.FormatInt(msg.Offset, 10),
    }))
}

Pattern C — incremental sync (CDC)

Subscribe to PostgreSQL LISTEN/NOTIFY or MongoDB change streams and mirror writes directly. See Handler adapters for production-ready adapters.

Data transformation

If you need to transform records before ingestion, register a transformer:

client.Transformers.Register("redact-pii", func(r map[string]any) map[string]any {
    if email, ok := r["email"].(string); ok {
        r["email_hash"] = sha256sum(email)
        delete(r, "email")
    }
    return r
})

ws.Store(record, sdk.WithTransformers("redact-pii"))

Transformers run in-process before the record reaches the Handler, so PII never leaves your application boundary.