Skip to content

SQL Parser

Core SQL parser powered by sqlglot. Extracts nodes, edges, column usage, and column lineage from SQL statements.

SqlParser

SqlParser(dialect=None)

Parses SQL files into nodes, edges, column usage, and column lineage using sqlglot.

Handles multi-statement files, CTE extraction, column-level scope analysis, transform detection, and end-to-end column lineage tracing. Dialect-aware identifier normalisation ensures consistent casing across Postgres, Snowflake, DuckDB, and other engines.

Initialise with an optional SQL dialect.

Parameters:

Name Type Description Default
dialect str | None

sqlglot dialect string (e.g., 'postgres', 'mysql', 'duckdb'). None for auto-detection.

None
Source code in src/sqlprism/languages/sql.py
46
47
48
49
50
51
52
53
def __init__(self, dialect: str | None = None):
    """Initialise with an optional SQL dialect.

    Args:
        dialect: sqlglot dialect string (e.g., 'postgres', 'mysql', 'duckdb').
                 None for auto-detection.
    """
    self.dialect = dialect

parse

parse(file_path, file_content, schema=None)

Parse a SQL file into nodes, edges, column usage, and column lineage.

Handles multiple statements per file. Each statement is parsed independently. Errors in one statement don't prevent parsing others.

Parameters:

Name Type Description Default
file_path str

Path to the SQL file (used for naming nodes).

required
file_content str

Raw SQL content.

required
schema dict | None

Optional schema catalog {table: {col: type}} for expanding SELECT * in lineage tracing via qualify_columns.

None

Returns:

Type Description
ParseResult

A ParseResult containing all extracted nodes, edges,

ParseResult

column usage records, column lineage chains, and any

ParseResult

non-fatal parse errors.

Source code in src/sqlprism/languages/sql.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def parse(self, file_path: str, file_content: str, schema: dict | None = None) -> ParseResult:
    """Parse a SQL file into nodes, edges, column usage, and column lineage.

    Handles multiple statements per file. Each statement is parsed
    independently. Errors in one statement don't prevent parsing others.

    Args:
        file_path: Path to the SQL file (used for naming nodes).
        file_content: Raw SQL content.
        schema: Optional schema catalog ``{table: {col: type}}`` for
            expanding ``SELECT *`` in lineage tracing via
            ``qualify_columns``.

    Returns:
        A ``ParseResult`` containing all extracted nodes, edges,
        column usage records, column lineage chains, and any
        non-fatal parse errors.
    """
    nodes: list[NodeResult] = []
    edges: list[EdgeResult] = []
    column_usage: list[ColumnUsageResult] = []
    column_lineage: list[ColumnLineageResult] = []
    columns: list[ColumnDefResult] = []
    errors: list[str] = []

    file_stem = self._normalize_identifier(self._smart_file_name(file_path))

    has_statements = False

    try:
        statements = sqlglot.parse(file_content, dialect=self.dialect)
    except (sqlglot.errors.ParseError, sqlglot.errors.TokenError) as e:
        return ParseResult(language="sql", errors=[f"Parse error: {e}"])

    # Persistent dedup sets across all statements in this file
    seen_nodes: set[tuple[str, str, str | None]] = set()
    seen_ctes: set[str] = set()

    for stmt_idx, stmt in enumerate(statements):
        if stmt is None:
            continue
        has_statements = True

        try:
            self._process_statement(
                stmt,
                file_stem,
                file_path,
                nodes,
                edges,
                column_usage,
                columns,
                seen_nodes=seen_nodes,
                seen_ctes=seen_ctes,
            )
        except Exception as e:
            errors.append(f"Statement {stmt_idx}: {type(e).__name__}: {e}")
            continue

        # Column lineage via sqlglot.lineage — separate pass
        try:
            self._extract_column_lineage(stmt, file_stem, file_content, column_lineage, schema=schema)
        except Exception as e:
            errors.append(f"Lineage stmt {stmt_idx}: {type(e).__name__}: {e}")
            continue

    # Emit a file-level query node (or promote existing bare_query node).
    # Only skip if a CREATE-defined node (table/view) already uses the file name,
    # or if the file had no parseable statements.
    create_defined = {
        n.name for n in nodes
        if n.kind in ("table", "view") and (n.metadata or {}).get("create_type")
    }
    if has_statements and file_stem not in create_defined:
        for i, n in enumerate(nodes):
            if n.name == file_stem and (n.metadata or {}).get("bare_query"):
                # Promote existing bare_query node by adding file_node flag
                nodes[i] = NodeResult(
                    kind=n.kind,
                    name=n.name,
                    line_start=n.line_start,
                    line_end=n.line_end,
                    metadata={**(n.metadata or {}), "file_node": True},
                )
                break
        else:
            nodes.append(
                NodeResult(
                    kind="query",
                    name=file_stem,
                    metadata={"file_node": True},
                )
            )

    return ParseResult(
        language="sql",
        nodes=nodes,
        edges=edges,
        column_usage=column_usage,
        column_lineage=column_lineage,
        columns=columns,
        errors=errors,
    )