Basic Usage #
The stream method executes a SQL statement and returns an async generator that yields rows one at a time as pages are fetched. Ideal for large result sets where holding all data in memory would be impractical.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import { TrinoClient } from "@lakeql/trino-client"
const client = new TrinoClient({
host: "https://trino.example.com",
port: 8443,
auth: { type: "basic", username: "analyst", password: "secret" },
catalog: "hive",
schema: "logs",
})
const stream = await client.stream<[string, string, number]>({
sql: "SELECT timestamp, level, duration FROM request_logs",
})
for await (const [timestamp, level, duration] of stream) {
if (duration > 5000) {
console.warn(`Slow request at ${timestamp}: ${duration}ms`)
}
}
Streaming with Transforms #
Combine streaming with transform to get typed objects:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import { TrinoClient } from "@lakeql/trino-client"
import type { Column } from "@lakeql/trino-client"
const client = new TrinoClient({
host: "https://trino.example.com",
port: 8443,
auth: { type: "basic", username: "analyst", password: "secret" },
catalog: "hive",
schema: "logs",
})
interface LogEntry {
timestamp: string
level: string
duration: number
}
const stream = await client.stream<LogEntry>({
sql: "SELECT timestamp, level, duration FROM request_logs",
transform: (row: unknown[], columns: Column[]) => ({
timestamp: row[0] as string,
level: row[1] as string,
duration: row[2] as number,
}),
})
for await (const entry of stream) {
if (entry.level === "ERROR") {
console.error(`Error at ${entry.timestamp}: ${entry.duration}ms`)
}
}
Cancelling a Stream #
Pass an AbortSignal to cancel a stream mid-flight:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import { TrinoClient } from "@lakeql/trino-client"
const client = new TrinoClient({
host: "https://trino.example.com",
port: 8443,
auth: { type: "basic", username: "analyst", password: "secret" },
catalog: "hive",
schema: "logs",
})
const controller = new AbortController()
const stream = await client.stream({
sql: "SELECT * FROM events",
signal: controller.signal,
})
let count = 0
for await (const row of stream) {
count += 1
if (count >= 1000) {
controller.abort()
break
}
}
When to Use stream vs query #
| Scenario | Method |
|---|---|
| Result fits comfortably in memory | query |
| Large or unbounded result sets | stream |
| Need all rows before processing | query |
| Can process rows incrementally | stream |
| ETL pipelines / file exports | stream |
Memory Efficiency #
With query, all pages are collected into a single array before the promise resolves. With stream, each page is fetched on-demand and rows are yielded immediately — only one page of data is held in memory at a time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import { TrinoClient } from "@lakeql/trino-client"
const client = new TrinoClient({
host: "https://trino.example.com",
port: 8443,
auth: { type: "basic", username: "analyst", password: "secret" },
catalog: "hive",
schema: "analytics",
})
// Processing millions of rows without loading them all into memory
const stream = await client.stream<[string, number]>({
sql: "SELECT user_id, event_count FROM daily_aggregates",
})
let processed = 0
for await (const [userId, count] of stream) {
console.log(`${userId}: ${count}`)
processed += 1
}
console.log(`Processed ${processed} rows`)