Working with Mutations #
LakeQL provides a complete mutation pipeline that handles data persistence end-to-end. When you configure mutation support in your endpoint definition, the generated resolver automatically handles validation, Parquet conversion, S3 upload, and Hive table management — no manual stub implementation needed.
The Mutation Pipeline #
When a mutation is executed, the generated resolver runs through this pipeline:
- Input arrives via GraphQL mutation
- Validation — Input is checked against a generated Zod schema (if field validations are configured)
-
Parquet conversion
— Records are serialized into Parquet format via
@lakeql/parquet - S3 upload — The Parquet file is uploaded to the configured base path
- Hive DDL — The external table is recreated to point at the new data
If any step fails, the pipeline stops immediately and propagates the error to the GraphQL error layer.
Enabling Mutations #
Add a mutation configuration to your endpoint definition:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38{
"version": "1.0",
"tableName": "user_events",
"catalog": "hive",
"schema": "analytics",
"fields": [
{
"name": "email",
"type": "String",
"options": {
"required": true,
"validations": [{ "type": "email" }]
}
},
{
"name": "age",
"type": "Integer",
"options": {
"required": false,
"validations": [
{ "type": "min", "value": 0 },
{ "type": "max", "value": 150 }
]
}
},
{ "name": "event_type", "type": "String" },
{ "name": "timestamp", "type": "DateTime" }
],
"mutation": {
"loadStrategy": "full_load",
"type": "s3",
"bucket": "my-datalake",
"basePath": "warehouse/analytics/user_events",
"region": "eu-central-1",
"endpoint": "https://s3.eu-central-1.amazonaws.com"
}
}
Then generate the endpoint:
1
2
lakeql-cli create-endpoint --from-file ./my-endpoint.json
This produces a mutation-schema.ts with a fully wired resolver that invokes the write pipeline, and a validations.ts with the Zod schema for input validation.
Load Strategies #
The loadStrategy field controls how data is persisted. Choose based on your use case:
| Strategy | Best for | Behavior |
|---|---|---|
full_load | Dimension tables, reference data | Replaces all data on every write |
full_load_append | Datasets needing current + history | Maintains latest snapshot and historical log |
append | Event streams, audit logs | Only adds data, never replaces |
See the Load Strategies guide for detailed explanations and file layout diagrams.
Mutation Configuration Reference #
| Property | Type |
|---|---|
| loadStrategy | LoadStrategy |
Load strategy for the write pipeline. | |
| type | StorageType |
Storage adapter type. "s3" | |
| bucket | string |
S3/MinIO bucket name for endpoint data. | |
| basePath | string |
Base path for endpoint data. | |
| region? | string |
Optional region override (falls back to AWS_DEFAULT_REGION env var for S3). | |
| endpoint? | string |
Optional custom endpoint. Required for MinIO, optional for S3. | |
| partitioning? | PartitioningValue |
Partitioning mode. true | |
| partitioningFormat? | string |
Partition path granularity. One of "year/month/day" | |
Partitioning #
The partitioning and partitioningFormat fields control how files are organized in storage. These settings apply to full_load_append and append strategies only — full_load ignores partitioning entirely.
Write-timestamp partitioning (default) #
When partitioning is true (the default), files are partitioned by the current UTC timestamp. A load_timestamp field is added to the endpoint definition with readOnly: true — it appears in the query schema and Hive DDL but is excluded from the mutation input. The write pipeline populates it automatically at runtime. Additionally, load_timestamp_year (Integer) and load_timestamp_month (Integer) fields are injected as materialized partition columns, enabling direct filtering in tools that read Parquet files without Hive metastore awareness (e.g. Jupyter notebooks with PyArrow, Pandas, or DuckDB).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29{
"fields": [
{ "name": "event_type", "type": "String" },
{
"name": "load_timestamp",
"type": "DateTime",
"options": { "readOnly": true }
},
{
"name": "load_timestamp_year",
"type": "Integer",
"options": { "readOnly": true }
},
{
"name": "load_timestamp_month",
"type": "Integer",
"options": { "readOnly": true }
}
],
"mutation": {
"loadStrategy": "append",
"type": "s3",
"bucket": "my-datalake",
"basePath": "warehouse/raw/events",
"partitioning": true,
"partitioningFormat": "year/month/day"
}
}
load_timestamp, load_timestamp_year, and
load_timestamp_month fields automatically when timestamp partitioning is
active. Fields with readOnly: true are queryable via GraphQL but cannot be
provided as mutation input.Disabled partitioning (flat layout) #
When partitioning is false, files are written to a flat directory without date-based directories. No load_timestamp field is added.
1
2
3
4
5
6
7
8
9
10
{
"mutation": {
"loadStrategy": "append",
"type": "s3",
"bucket": "my-datalake",
"basePath": "warehouse/raw/events",
"partitioning": false
}
}
File layout with flat partitioning:
1
2
3
4
5
<basePath>/
all.parquet/
<uuid-1>.parquet
<uuid-2>.parquet
Field-based partitioning #
When partitioning is set to a field name string, records are grouped by the value of that date/datetime field and written to separate partition paths. No load_timestamp field is added.
1
2
3
4
5
6
7
8
9
10
11
{
"mutation": {
"loadStrategy": "append",
"type": "s3",
"bucket": "my-datalake",
"basePath": "warehouse/raw/events",
"partitioning": "event_date",
"partitioningFormat": "year/month"
}
}
The field must exist in every record and contain a valid ISO 8601 date or datetime value. Field names must start with a letter or underscore, contain only alphanumeric characters and underscores, and be 1–64 characters long.
Custom partition format #
For advanced use cases you can define a custom partition format string that combines multiple fields and date components. The format uses / to separate path segments and : to extract date components from a field.
1
2
3
4
5
6
7
8
9
10
{
"mutation": {
"loadStrategy": "append",
"type": "s3",
"bucket": "my-datalake",
"basePath": "warehouse/raw/events",
"partitioning": "customer_id/event_date:year/event_date:month"
}
}
This produces paths like:
1
2
3
4
all.parquet/
customer_id=42/year=2024/month=06/<uuid>.parquet
customer_id=99/year=2024/month=06/<uuid>.parquet
Format syntax #
Each segment is either a plain field name or a field with a date component:
| Segment | Behavior |
|---|---|
customer_id | Extracts the raw field value → customer_id=<value> |
event_date:year | Parses the field as ISO date, extracts year → year=2024 |
event_date:month | Extracts month (zero-padded) → month=06 |
event_date:day | Extracts day (zero-padded) → day=15 |
ts:hour | Extracts hour (zero-padded) → hour=08 |
ts:minute | Extracts minute (zero-padded) → minute=30 |
ts:second | Extracts second (zero-padded) → second=07 |
Supported date components: year, month, day, hour, minute, second.
Records are grouped by their resolved partition key — records with the same key are written to the same Parquet file.
When using a custom format, the partitioningFormat field is ignored (the format is fully determined by the custom string). No load_timestamp field is added.
PartitionFieldError is thrown if any record has a missing, null, or
unparseable field value.partitioning and partitioningFormat fields are not included in the
generated config when loadStrategy is full_load, since full-load always
overwrites the entire dataset as a single file.Storage Configuration #
The type field in the mutation configuration selects the storage adapter used by the write pipeline.
S3 (default) #
S3 is the default storage backend. Credentials and region are read from standard AWS environment variables:
| Environment Variable | Description |
|---|---|
AWS_ACCESS_KEY_ID | AWS access key |
AWS_SECRET_ACCESS_KEY | AWS secret key |
AWS_DEFAULT_REGION | Default region (fallback) |
AWS_ENDPOINT_URL | Custom S3 endpoint (fallback) |
1
2
3
4
5
6
7
8
9
{
"mutation": {
"loadStrategy": "full_load",
"type": "s3",
"bucket": "my-datalake",
"basePath": "warehouse/analytics/user_events"
}
}
MinIO (local development) #
MinIO is supported for local development and self-hosted S3-compatible storage. The endpoint field is required when using MinIO. Credentials are read from MinIO-specific environment variables:
| Environment Variable | Description |
|---|---|
MINIO_ACCESS_KEY_ID | MinIO access key |
MINIO_SECRET_ACCESS_KEY | MinIO secret key |
MINIO_ENDPOINT | MinIO server URL (fallback) |
1
2
3
4
5
6
7
8
9
10
{
"mutation": {
"loadStrategy": "full_load",
"type": "minio",
"bucket": "local-datalake",
"basePath": "warehouse/analytics/user_events",
"endpoint": "http://localhost:9000"
}
}
type is "minio", the endpoint field is required either in the
configuration or via the MINIO_ENDPOINT environment variable.Generated Resolver #
The generated mutation-schema.ts integrates with @lakeql/adapters to execute the full write pipeline. Here's what the generated code looks like conceptually:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46import { builder } from "@lakeql/api"
import { executeWritePipeline } from "@lakeql/adapters"
import { validationSchema } from "./validations"
import jsonSchema from "./json-schema.json"
// GraphQL input type generated from field definitions
const UserEventsInput = builder.inputType("UserEventsInput", {
fields: (t) => ({
email: t.string({ required: true }),
age: t.int({ required: false }),
event_type: t.string({ required: false }),
timestamp: t.string({ required: false }),
}),
})
builder.mutationField("writeUserEvents", (t) =>
t.field({
type: "Boolean",
args: {
input: t.arg({ type: [UserEventsInput], required: true }),
},
resolve: async (_root, args, context) => {
// 1. Validate input against Zod schema
const validated = validationSchema.parse(args.input)
// 2. Execute the write pipeline
await executeWritePipeline({
records: validated,
jsonSchema,
config: {
loadStrategy: "full_load",
basePath: "warehouse/analytics/user_events",
s3: context.s3Config,
table: {
catalog: "hive",
schema: "analytics",
tableName: "user_events",
},
},
})
return true
},
})
)
create-endpoint. The
example above shows what the generated resolver does under the hood.Input Validation #
When fields have options.validations configured, a validations.ts file is generated with a Zod schema. The mutation resolver validates input before invoking the pipeline:
- If validation passes, the pipeline proceeds
- If validation fails, Zod errors are returned as GraphQL field errors without any data being written
See the create-endpoint reference for the full list of available validations.
Write Permission Model #
LakeQL's write permission model is default-deny. Users must have explicit Mutation rules to perform any write operation.
This design is intentional:
- Write operations are more sensitive than reads
- Writes often execute via a shared system user in Trino
- Each table typically has a defined data owner (source system)
- Explicit rules enforce data ownership boundaries
Configuring Write Permissions #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import { defineConfig } from "@lakeql/api"
import { allConfigs } from "./config-registry"
export default defineConfig({
allConfigs,
permissions: [
{
name: "ingestion-service",
useSystemUser: true,
permissions: {
Query: [{ catalog: "hive", schema: "raw", tables: ["*"] }],
Mutation: [
{
catalog: "hive",
schema: "raw",
tables: ["events", "user_actions"],
},
],
},
},
{
name: "admin-user",
useSystemUser: false,
permissions: {
Query: [{ catalog: "hive", schema: "*", tables: ["*"] }],
Mutation: [{ catalog: "hive", schema: "config", tables: ["*"] }],
},
},
],
})
Permission Resolution #
For write operations, the permission check follows this logic:
- No authenticated user → Denied
-
No
Mutationrules for this user → Denied - Rules exist but don't match catalog/schema/table → Denied
- Matching rule found → Allowed
System User Impersonation #
Write statements in Trino often need to execute as a system user with broad permissions, even when the request originates from a regular user.
When useSystemUser: true is set for a permission entry, the Trino client uses system credentials for that user's write operations instead of their own identity. This means:
- Trino sees the system user as the executor
- The application layer controls which tables a given client can write to
- Audit trails should track both the requesting user and the executing identity
1
2
3
4
5
6
7
8
9
10
11
{
name: "etl-pipeline",
useSystemUser: true, // Execute writes as system user in Trino
permissions: {
Query: [],
Mutation: [
{ catalog: "hive", schema: "staging", tables: ["*"] }
]
}
}
useSystemUser is false, the write is executed with the authenticated
user's Trino credentials. This requires that user to exist in Trino with
appropriate table privileges.Error Handling #
The mutation pipeline uses fail-fast error propagation:
| Failure point | Behavior |
|---|---|
| Zod validation | Returns field errors to GraphQL, no data written |
| Parquet conversion | Error propagated, no S3 or DDL operations attempted |
| S3 upload | Error propagated, no DDL operations attempted |
| Hive DDL | Error propagated (with rollback attempt for full_load_append) |
For full_load_append, if one of the two Hive table creations fails, the system attempts a best-effort rollback of both tables before propagating the original error.
Custom Mutations #
For use cases not covered by the pipeline (e.g., custom SQL, cross-table operations), you can still create manual mutation resolvers. See the Custom Resolvers guide.