executeWritePipeline is the primary function for persisting GraphQL mutation input. It converts records to Parquet, uploads them to object storage, and manages the corresponding Hive external table in Trino.
Signature #
1
2
function executeWritePipeline(input: WritePipelineInput): Promise<void>
Input #
| Property | Type |
|---|---|
| records | Record<string, any>[] |
The records to persist. Accepts any array of objects with string keys. | |
| jsonSchema | JsonSchema |
The JSON Schema describing the record structure. | |
| config | WritePipelineConfig |
Pipeline configuration. | |
Configuration #
| Property | Type | ||||||||
|---|---|---|---|---|---|---|---|---|---|
| loadStrategy? | LoadStrategy | ||||||||
The load strategy for this endpoint. "full_load" | |||||||||
| type? | StorageType | ||||||||
Storage adapter type. "s3" | |||||||||
| bucket | string | ||||||||
Bucket name for storing Parquet files. | |||||||||
| basePath | string | ||||||||
The base path for storing Parquet files. | |||||||||
| region? | string | ||||||||
Optional region override. | |||||||||
| endpoint? | string | ||||||||
Optional custom endpoint for S3-compatible storage. | |||||||||
| table | Object | ||||||||
Hive table definition for DDL management. | |||||||||
| |||||||||
| trinoClient | TrinoClient | ||||||||
The Trino client instance for DDL operations. | |||||||||
| partitioning? | PartitioningValue | ||||||||
Partitioning mode. true | |||||||||
| partitioningFormat? | PartitioningFormat | ||||||||
Partition format granularity. "year/month/day" | |||||||||
Usage #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41import { executeWritePipeline } from "@lakeql/adapters"
import { TrinoClient } from "@lakeql/trino-client"
const trinoClient = new TrinoClient({
host: "https://trino.example.com",
port: 8443,
auth: { type: "basic", username: "admin", password: "secret" },
catalog: "hive",
})
await executeWritePipeline({
records: [
{
event_id: "abc-123",
message: "Hello",
timestamp: "2025-01-15T10:30:00Z",
},
],
jsonSchema: {
type: "object",
properties: {
event_id: { type: "string" },
message: { type: "string" },
timestamp: { type: "string", format: "date-time" },
},
},
config: {
loadStrategy: "full_load",
type: "minio",
bucket: "my-datalake",
basePath: "analytics/events",
endpoint: "http://localhost:9000",
table: {
catalog: "hive",
schema: "analytics",
tableName: "events",
},
trinoClient,
},
})
Pipeline steps #
The pipeline executes these steps in order:
- Convert to Parquet — Records are serialized to a Parquet buffer using the provided JSON Schema
- Upload to storage — The Parquet file is uploaded to the configured bucket/path
- Manage Hive DDL — The external table is dropped and recreated pointing to the new data location
If any step fails, the pipeline stops immediately and throws the error. There is no automatic rollback of earlier steps.
jsonSchema property is typically generated by the CLI as
json-schema.json in each endpoint directory. You don't need to write it by
hand.