LakeQL
Overview
  • Introduction
  • Hive Table Manager
Write Pipeline
  • executeWritePipeline
  • Load Strategies
  • Partitioning
Storage
  • Storage Operations
  • API Reference
GitHub
LakeQL
  1. Adapters
  2. Write Pipeline
  3. executeWritePipeline

On this page

  1. Signature
  2. Input
  3. Configuration
  4. Usage
  5. Pipeline steps

executeWritePipeline

The main entry point for persisting mutation data through the write pipeline.

executeWritePipeline is the primary function for persisting GraphQL mutation input. It converts records to Parquet, uploads them to object storage, and manages the corresponding Hive external table in Trino.

Signature #

1
2
function executeWritePipeline(input: WritePipelineInput): Promise<void>

Input #

PropertyType
recordsRecord<string, any>[]

The records to persist. Accepts any array of objects with string keys.

jsonSchemaJsonSchema

The JSON Schema describing the record structure.

configWritePipelineConfig

Pipeline configuration.

Configuration #

PropertyType
loadStrategy?LoadStrategy

The load strategy for this endpoint.

Default: "full_load"
type?StorageType

Storage adapter type.

Default: "s3"
bucketstring

Bucket name for storing Parquet files.

basePathstring

The base path for storing Parquet files.

region?string

Optional region override.

endpoint?string

Optional custom endpoint for S3-compatible storage.

tableObject

Hive table definition for DDL management.

PropertyType
└ catalogstring
└ schemastring
└ tableNamestring
trinoClientTrinoClient

The Trino client instance for DDL operations.

partitioning?PartitioningValue

Partitioning mode. true partitions by write timestamp, false disables, or a string for field-based/custom.

Default: true
partitioningFormat?PartitioningFormat

Partition format granularity.

Default: "year/month/day"

Usage #

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41import { executeWritePipeline } from "@lakeql/adapters"
import { TrinoClient } from "@lakeql/trino-client"

const trinoClient = new TrinoClient({
  host: "https://trino.example.com",
  port: 8443,
  auth: { type: "basic", username: "admin", password: "secret" },
  catalog: "hive",
})

await executeWritePipeline({
  records: [
    {
      event_id: "abc-123",
      message: "Hello",
      timestamp: "2025-01-15T10:30:00Z",
    },
  ],
  jsonSchema: {
    type: "object",
    properties: {
      event_id: { type: "string" },
      message: { type: "string" },
      timestamp: { type: "string", format: "date-time" },
    },
  },
  config: {
    loadStrategy: "full_load",
    type: "minio",
    bucket: "my-datalake",
    basePath: "analytics/events",
    endpoint: "http://localhost:9000",
    table: {
      catalog: "hive",
      schema: "analytics",
      tableName: "events",
    },
    trinoClient,
  },
})

Pipeline steps #

The pipeline executes these steps in order:

  1. Convert to Parquet — Records are serialized to a Parquet buffer using the provided JSON Schema
  2. Upload to storage — The Parquet file is uploaded to the configured bucket/path
  3. Manage Hive DDL — The external table is dropped and recreated pointing to the new data location

If any step fails, the pipeline stops immediately and throws the error. There is no automatic rollback of earlier steps.

The jsonSchema property is typically generated by the CLI as json-schema.json in each endpoint directory. You don't need to write it by hand.

Previous page

Write Pipeline

Next page

Load Strategies

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41