Skip to content

Indexer (Orchestrator)

Orchestrates the indexing pipeline: scan repos, checksum files, dispatch to parsers, store results.

Indexer

Indexer(graph)

Orchestrates parsing and indexing across repos.

Connects language parsers to the GraphDB storage layer. Handles file scanning, checksum diffing, dialect resolution, and batch insertion of parse results. Supports plain SQL repos, sqlmesh projects, and dbt projects.

Initialise the indexer.

Parameters:

Name Type Description Default
graph GraphDB

The GraphDB instance to write parsed data into.

required
Source code in src/sqlprism/core/indexer.py
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(self, graph: GraphDB):
    """Initialise the indexer.

    Args:
        graph: The ``GraphDB`` instance to write parsed data into.
    """
    self.graph = graph
    self._parser_cache: dict[str | None, SqlParser] = {}
    self._sqlmesh_renderer = None
    self._dbt_renderer = None
    # Stat-based pre-filter cache: abs_path -> (mtime, size, checksum)
    # Avoids re-reading file bytes when mtime+size are unchanged.
    self._file_stat_cache: OrderedDict[str, tuple[float, int, str]] = OrderedDict()

get_parser

get_parser(dialect=None)

Get or create a SqlParser for the given dialect.

Source code in src/sqlprism/core/indexer.py
51
52
53
54
55
def get_parser(self, dialect: str | None = None) -> SqlParser:
    """Get or create a SqlParser for the given dialect."""
    if dialect not in self._parser_cache:
        self._parser_cache[dialect] = SqlParser(dialect=dialect)
    return self._parser_cache[dialect]

get_sqlmesh_renderer

get_sqlmesh_renderer(dialect=None)

Get or create a SqlMeshRenderer with the correct dialect parser.

Source code in src/sqlprism/core/indexer.py
57
58
59
60
61
62
63
def get_sqlmesh_renderer(self, dialect: str | None = None):
    """Get or create a SqlMeshRenderer with the correct dialect parser."""
    from sqlprism.languages.sqlmesh import SqlMeshRenderer

    if self._sqlmesh_renderer is None or (dialect and self._sqlmesh_renderer.sql_parser.dialect != dialect):
        self._sqlmesh_renderer = SqlMeshRenderer(sql_parser=self.get_parser(dialect))
    return self._sqlmesh_renderer

reindex_repo

reindex_repo(
    name, path, dialect=None, dialect_overrides=None
)

Reindex a single repo by scanning for SQL files.

Compares file checksums against the stored index to determine added, changed, and deleted files. Only changed files are re-parsed.

Parameters:

Name Type Description Default
name str

Repo name in the index.

required
path str | Path

Absolute path to the repo root.

required
dialect str | None

Default SQL dialect (e.g. "starrocks", "athena").

None
dialect_overrides dict[str, str] | None

Per-path dialect overrides as {glob_pattern: dialect}, e.g. {"athena/": "athena", "starrocks/**": "starrocks"}.

None

Returns:

Type Description
dict

Stats dict with keys files_scanned, files_added,

dict

files_changed, files_removed, nodes_added,

dict

edges_added, column_usage_added, lineage_chains,

dict

column_usage_dropped, parse_errors, and

dict

phantoms_cleaned.

Source code in src/sqlprism/core/indexer.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def reindex_repo(
    self,
    name: str,
    path: str | Path,
    dialect: str | None = None,
    dialect_overrides: dict[str, str] | None = None,
) -> dict:
    """Reindex a single repo by scanning for SQL files.

    Compares file checksums against the stored index to determine
    added, changed, and deleted files. Only changed files are re-parsed.

    Args:
        name: Repo name in the index.
        path: Absolute path to the repo root.
        dialect: Default SQL dialect (e.g. ``"starrocks"``, ``"athena"``).
        dialect_overrides: Per-path dialect overrides as
            ``{glob_pattern: dialect}``, e.g.
            ``{"athena/": "athena", "starrocks/**": "starrocks"}``.

    Returns:
        Stats dict with keys ``files_scanned``, ``files_added``,
        ``files_changed``, ``files_removed``, ``nodes_added``,
        ``edges_added``, ``column_usage_added``, ``lineage_chains``,
        ``column_usage_dropped``, ``parse_errors``, and
        ``phantoms_cleaned``.
    """
    path = Path(path).resolve()
    repo_id = self.graph.upsert_repo(name, str(path), repo_type="sql")

    # Get current checksums from DB
    stored_checksums = self.graph.get_file_checksums(repo_id)

    # Scan filesystem
    current_files = self._scan_files(path)

    # Determine what changed
    changed = []
    added = []
    for rel_path, checksum in current_files.items():
        if rel_path not in stored_checksums:
            added.append(rel_path)
        elif stored_checksums[rel_path] != checksum:
            changed.append(rel_path)

    deleted = [p for p in stored_checksums if p not in current_files]

    # Parse and insert changed/added files
    stats = {
        "files_scanned": len(current_files),
        "files_added": len(added),
        "files_changed": len(changed),
        "files_removed": len(deleted),
        "nodes_added": 0,
        "edges_added": 0,
        "column_usage_added": 0,
        "columns_added": 0,
        "lineage_chains": 0,
        "column_usage_dropped": 0,
        "parse_errors": [],
    }

    # Delete truly removed files in a transaction
    if deleted:
        with self.graph.write_transaction():
            for rel_path in deleted:
                self.graph.delete_file_data(repo_id, rel_path)

    # Build schema catalog from existing index for SELECT * expansion
    schema_catalog = self.graph.get_table_columns(repo_id) or None

    # Changed + added files: delete old + insert new in same transaction
    # so a crash never leaves a file in a "deleted but not yet reinserted" state
    changed_set = set(changed)
    for rel_path in changed + added:
        # Resolve dialect for this file
        file_dialect = _resolve_dialect(rel_path, dialect, dialect_overrides)
        parser = self.get_parser(file_dialect)

        full_path = path / rel_path
        try:
            content = full_path.read_text(errors="replace")
        except (OSError, PermissionError):
            logger.warning("Cannot read file %s — skipping", full_path)
            stats["parse_errors"].append(f"{rel_path}: unreadable (OS/permission error)")
            continue
        checksum = current_files[rel_path]

        # Parse — pass schema catalog for SELECT * lineage expansion
        result = parser.parse(rel_path, content, schema=schema_catalog)
        if result.errors:
            for err in result.errors:
                stats["parse_errors"].append(f"{rel_path}: {err}")

        # Wrap per-file delete + insert in a transaction for atomicity
        with self.graph.write_transaction():
            if rel_path in changed_set:
                self.graph.delete_file_data(repo_id, rel_path)
            file_id = self.graph.insert_file(repo_id, rel_path, "sql", checksum)
            self._insert_parse_result(result, file_id, repo_id, stats)

    # Clean up phantom nodes that now have real counterparts
    phantoms_cleaned = self.graph.cleanup_phantoms()
    stats["phantoms_cleaned"] = phantoms_cleaned

    # Update repo metadata
    commit, branch = self._get_git_info(path)
    self.graph.update_repo_metadata(repo_id, commit=commit, branch=branch)

    self._run_convention_inference(repo_id, project_path=path)
    self.graph.refresh_property_graph()
    self.graph.clear_snippet_cache()
    return stats

reindex_sqlmesh

reindex_sqlmesh(
    repo_name,
    project_path,
    env_file=None,
    variables=None,
    dialect="athena",
    sqlmesh_command="uv run python",
    venv_dir=None,
)

Index a sqlmesh project by rendering all models first.

Uses SqlMeshRenderer to render every model via subprocess, then parses the rendered SQL and inserts results into the graph.

When source files haven't changed (matching fingerprint), skips the expensive rendering subprocess and re-parses cached SQL with the current schema catalog. This allows schema enrichment (SELECT * expansion) to converge while avoiding redundant rendering.

Parameters:

Name Type Description Default
repo_name str

Repo name in the index.

required
project_path str | Path

Path to the sqlmesh project directory (containing config.yaml).

required
env_file str | Path | None

Optional .env file to source before rendering.

None
variables dict[str, str | int] | None

Extra sqlmesh variables (e.g. {"GRACE_PERIOD": 7}).

None
dialect str

SQL dialect for rendering (default "athena").

'athena'
sqlmesh_command str

Command to invoke Python in the sqlmesh venv.

'uv run python'
venv_dir str | Path | None

Directory containing .venv. Auto-detected if not set.

None

Returns:

Type Description
dict

Stats dict with keys models_rendered, nodes_added,

dict

edges_added, column_usage_added, and lineage_chains.

Source code in src/sqlprism/core/indexer.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def reindex_sqlmesh(
    self,
    repo_name: str,
    project_path: str | Path,
    env_file: str | Path | None = None,
    variables: dict[str, str | int] | None = None,
    dialect: str = "athena",
    sqlmesh_command: str = "uv run python",
    venv_dir: str | Path | None = None,
) -> dict:
    """Index a sqlmesh project by rendering all models first.

    Uses ``SqlMeshRenderer`` to render every model via subprocess,
    then parses the rendered SQL and inserts results into the graph.

    When source files haven't changed (matching fingerprint), skips the
    expensive rendering subprocess and re-parses cached SQL with the
    current schema catalog. This allows schema enrichment (SELECT *
    expansion) to converge while avoiding redundant rendering.

    Args:
        repo_name: Repo name in the index.
        project_path: Path to the sqlmesh project directory
            (containing ``config.yaml``).
        env_file: Optional ``.env`` file to source before rendering.
        variables: Extra sqlmesh variables (e.g. ``{"GRACE_PERIOD": 7}``).
        dialect: SQL dialect for rendering (default ``"athena"``).
        sqlmesh_command: Command to invoke Python in the sqlmesh venv.
        venv_dir: Directory containing ``.venv``. Auto-detected if not set.

    Returns:
        Stats dict with keys ``models_rendered``, ``nodes_added``,
        ``edges_added``, ``column_usage_added``, and ``lineage_chains``.
    """
    project_path = Path(project_path).resolve()
    repo_id = self.graph.upsert_repo(repo_name, str(project_path), repo_type="sqlmesh")

    # Build schema catalog from existing index for SELECT * expansion
    schema_catalog = self.graph.get_table_columns(repo_id) or None

    # Check if source files changed — skip rendering subprocess if not
    current_fingerprint = _source_fingerprint(project_path)
    stored_fingerprint = self.graph.get_source_fingerprint(repo_id)
    cache_hit = (stored_fingerprint == current_fingerprint) and stored_fingerprint is not None

    renderer = self.get_sqlmesh_renderer(dialect)

    if cache_hit:
        # Source unchanged — load cached rendered SQL
        cached = self.graph.get_rendered_cache(repo_id)
        if cached:
            logger.info("Source unchanged, using cached rendered SQL (%d models)", len(cached))
            raw_models = {name: sql for name, (sql, _) in cached.items()}
            column_schemas = {name: schemas for name, (_, schemas) in cached.items()}
        else:
            cache_hit = False

    if not cache_hit:
        # Render from scratch via subprocess
        raw_models, column_schemas = renderer.render_project_raw(
            project_path=project_path,
            env_file=env_file,
            variables=variables,
            dialect=dialect,
            sqlmesh_command=sqlmesh_command,
            venv_dir=venv_dir,
        )

    # Parse rendered SQL with current schema catalog (always re-parse
    # to allow schema enrichment even when rendering was cached)
    if len(raw_models) >= 20:
        rendered = renderer._parse_models_parallel(raw_models, column_schemas, schema_catalog)
    else:
        rendered = renderer._parse_models_sequential(raw_models, column_schemas, schema_catalog)

    stats = {
        "models_rendered": len(rendered),
        "models_skipped": 0,
        "models_removed": 0,
        "render_cached": cache_hit,
        "nodes_added": 0,
        "edges_added": 0,
        "column_usage_added": 0,
        "columns_added": 0,
        "lineage_chains": 0,
    }

    with self.graph.write_transaction():
        # Load existing checksums inside transaction to avoid TOCTOU
        existing_checksums = self.graph.get_file_checksums(repo_id)

        # Track which file paths are in the current render
        seen_paths: set[str] = set()

        for model_name, result in rendered.items():
            clean_name = model_name.strip('"').replace('"."', "/")
            file_path = clean_name + ".sql"
            seen_paths.add(file_path)

            checksum = _checksum_parse_result(result)

            # Skip if checksum matches — model hasn't changed
            if existing_checksums.get(file_path) == checksum:
                stats["models_skipped"] += 1
                continue

            self.graph.delete_file_data(repo_id, file_path)
            file_id = self.graph.insert_file(repo_id, file_path, "sql", checksum)
            self._insert_parse_result(result, file_id, repo_id, stats)

        # Delete stale models that no longer exist in the project
        for stale_path in set(existing_checksums) - seen_paths:
            self.graph.delete_file_data(repo_id, stale_path)
            stats["models_removed"] += 1

        # Cleanup phantoms inside the transaction for atomicity
        phantoms_cleaned = self.graph.cleanup_phantoms()

    # Update fingerprint and render cache after successful index
    self.graph.update_source_fingerprint(repo_id, current_fingerprint)
    if not cache_hit:
        self.graph.update_rendered_cache(repo_id, raw_models, column_schemas)

    commit, branch = self._get_git_info(project_path)
    self.graph.update_repo_metadata(repo_id, commit=commit, branch=branch)

    self._run_convention_inference(repo_id, project_path=project_path)
    self.graph.refresh_property_graph()
    self.graph.clear_snippet_cache()
    stats["phantoms_cleaned"] = phantoms_cleaned
    return stats

reindex_dbt

reindex_dbt(
    repo_name,
    project_path,
    profiles_dir=None,
    env_file=None,
    target=None,
    dbt_command="uv run dbt",
    venv_dir=None,
    dialect=None,
)

Index a dbt project by compiling all models first.

Runs dbt compile via DbtRenderer, then parses each compiled SQL file and inserts results into the graph.

Parameters:

Name Type Description Default
repo_name str

Repo name in the index.

required
project_path str | Path

Path to the dbt project directory (containing dbt_project.yml).

required
profiles_dir str | Path | None

Path to the directory containing profiles.yml.

None
env_file str | Path | None

Optional .env file to source before compilation.

None
target str | None

dbt target name override.

None
dbt_command str

Command to invoke dbt (e.g. "uv run dbt").

'uv run dbt'
venv_dir str | Path | None

Directory to run from (where .venv lives).

None
dialect str | None

SQL dialect for parsing compiled output.

None

Returns:

Type Description
dict

Stats dict with keys models_compiled, nodes_added,

dict

edges_added, column_usage_added, and lineage_chains.

Source code in src/sqlprism/core/indexer.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def reindex_dbt(
    self,
    repo_name: str,
    project_path: str | Path,
    profiles_dir: str | Path | None = None,
    env_file: str | Path | None = None,
    target: str | None = None,
    dbt_command: str = "uv run dbt",
    venv_dir: str | Path | None = None,
    dialect: str | None = None,
) -> dict:
    """Index a dbt project by compiling all models first.

    Runs ``dbt compile`` via ``DbtRenderer``, then parses each
    compiled SQL file and inserts results into the graph.

    Args:
        repo_name: Repo name in the index.
        project_path: Path to the dbt project directory
            (containing ``dbt_project.yml``).
        profiles_dir: Path to the directory containing ``profiles.yml``.
        env_file: Optional ``.env`` file to source before compilation.
        target: dbt target name override.
        dbt_command: Command to invoke dbt (e.g. ``"uv run dbt"``).
        venv_dir: Directory to run from (where ``.venv`` lives).
        dialect: SQL dialect for parsing compiled output.

    Returns:
        Stats dict with keys ``models_compiled``, ``nodes_added``,
        ``edges_added``, ``column_usage_added``, and ``lineage_chains``.
    """
    project_path = Path(project_path).resolve()
    repo_id = self.graph.upsert_repo(repo_name, str(project_path), repo_type="dbt")

    # Build schema catalog from existing index for SELECT * expansion
    schema_catalog = self.graph.get_table_columns(repo_id) or None

    rendered = self.dbt_renderer.render_project(
        project_path=project_path,
        profiles_dir=profiles_dir,
        env_file=env_file,
        target=target,
        dbt_command=dbt_command,
        venv_dir=venv_dir,
        dialect=dialect,
        schema_catalog=schema_catalog,
    )

    stats = {
        "models_compiled": len(rendered),
        "nodes_added": 0,
        "edges_added": 0,
        "column_usage_added": 0,
        "columns_added": 0,
        "lineage_chains": 0,
    }

    for model_path, result in rendered.items():
        # Wrap delete + insert per model in a transaction for atomicity
        with self.graph.write_transaction():
            self.graph.delete_file_data(repo_id, model_path)
            checksum = _checksum_parse_result(result)
            file_id = self.graph.insert_file(repo_id, model_path, "sql", checksum)
            self._insert_parse_result(result, file_id, repo_id, stats)

    commit, branch = self._get_git_info(project_path)
    self.graph.update_repo_metadata(repo_id, commit=commit, branch=branch)

    self._run_convention_inference(repo_id, project_path=project_path)
    self.graph.refresh_property_graph()
    self.graph.clear_snippet_cache()
    return stats

reindex_files

reindex_files(paths, repo_configs=None)

Reindex specific files. Fast path for on-save.

Resolves each path to its repo, determines repo type (plain SQL, dbt, sqlmesh), and reindexes accordingly.

For plain SQL: read, parse, insert immediately. For dbt/sqlmesh: call render_models() for the affected models, then parse and insert the rendered SQL.

Parameters:

Name Type Description Default
paths list[str | Path]

Absolute file paths that changed. Non-SQL files and files outside configured repos are silently skipped.

required
repo_configs dict | None

Optional repo config dict (repo_name → config). Required for dbt/sqlmesh repos to pass renderer params. If not provided, only plain SQL repos are supported.

None

Returns:

Type Description
dict

Stats dict with reindexed, skipped, deleted,

dict

errors, and per-file details.

Source code in src/sqlprism/core/indexer.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def reindex_files(self, paths: list[str | Path], repo_configs: dict | None = None) -> dict:
    """Reindex specific files. Fast path for on-save.

    Resolves each path to its repo, determines repo type (plain SQL,
    dbt, sqlmesh), and reindexes accordingly.

    For plain SQL: read, parse, insert immediately.
    For dbt/sqlmesh: call ``render_models()`` for the affected models,
    then parse and insert the rendered SQL.

    Args:
        paths: Absolute file paths that changed. Non-SQL files and
            files outside configured repos are silently skipped.
        repo_configs: Optional repo config dict (repo_name → config).
            Required for dbt/sqlmesh repos to pass renderer params.
            If not provided, only plain SQL repos are supported.

    Returns:
        Stats dict with ``reindexed``, ``skipped``, ``deleted``,
        ``errors``, and per-file ``details``.
    """
    repo_configs = repo_configs or {}
    stats = {
        "reindexed": 0,
        "skipped": 0,
        "deleted": 0,
        "errors": [],
        "details": [],
    }

    # Fetch repo list once for all path resolutions (avoid N queries)
    all_repos = self.graph.get_all_repos()

    # Group files by repo
    files_by_repo: dict[int, list[Path]] = {}
    repo_info: dict[int, tuple[str, str, str]] = {}  # repo_id → (name, path, type)

    for raw_path in paths:
        file_path = Path(raw_path).resolve()

        # Skip non-SQL files
        if not is_sql_file(str(file_path)):
            stats["skipped"] += 1
            stats["details"].append({"path": str(file_path), "status": "skipped", "reason": "not a SQL file"})
            continue

        resolved = self._resolve_file_repo(file_path, all_repos)
        if resolved is None:
            stats["skipped"] += 1
            stats["details"].append({"path": str(file_path), "status": "skipped", "reason": "no matching repo"})
            continue

        repo_id, repo_name, repo_path, repo_type = resolved
        files_by_repo.setdefault(repo_id, []).append(file_path)
        repo_info[repo_id] = (repo_name, repo_path, repo_type)

    # Process each repo group
    for repo_id, files in files_by_repo.items():
        repo_name, repo_path, repo_type = repo_info[repo_id]
        cfg = repo_configs.get(repo_name, {})

        if repo_type == "sql":
            self._reindex_sql_files(repo_id, Path(repo_path), files, stats, cfg)
        elif repo_type == "dbt":
            self._reindex_dbt_files(repo_id, Path(repo_path), files, stats, cfg)
        elif repo_type == "sqlmesh":
            self._reindex_sqlmesh_files(repo_id, Path(repo_path), files, stats, cfg)
        else:
            for f in files:
                stats["skipped"] += 1
                stats["details"].append({
                    "path": str(f), "status": "skipped", "reason": f"unknown repo_type '{repo_type}'",
                })

    # Single refresh after all file groups processed (not per-file)
    if files_by_repo:
        self.graph.refresh_property_graph()

    return stats

parse_file

parse_file(file_path, content, dialect=None, schema=None)

Parse a single SQL file without writing to the database.

Parameters:

Name Type Description Default
file_path str

File path (used for naming nodes, not read from disk).

required
content str

Raw SQL content.

required
dialect str | None

Optional SQL dialect override.

None
schema dict | None

Optional schema catalog for SELECT * expansion.

None

Returns:

Type Description
ParseResult

A ParseResult with extracted nodes, edges, column usage,

ParseResult

and lineage. Returns an empty result for non-SQL files.

Source code in src/sqlprism/core/indexer.py
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
def parse_file(
    self,
    file_path: str,
    content: str,
    dialect: str | None = None,
    schema: dict | None = None,
) -> ParseResult:
    """Parse a single SQL file without writing to the database.

    Args:
        file_path: File path (used for naming nodes, not read from disk).
        content: Raw SQL content.
        dialect: Optional SQL dialect override.
        schema: Optional schema catalog for ``SELECT *`` expansion.

    Returns:
        A ``ParseResult`` with extracted nodes, edges, column usage,
        and lineage. Returns an empty result for non-SQL files.
    """
    if not is_sql_file(file_path):
        return ParseResult(language="sql")
    return self.get_parser(dialect).parse(file_path, content, schema=schema)

parse_file_at_commit

parse_file_at_commit(
    repo_path, file_path, commit, dialect=None
)

Parse a file at a specific git commit.

Retrieves file content via git show and parses it without writing to the database. Used by pr_impact analysis.

Parameters:

Name Type Description Default
repo_path Path

Absolute path to the git repo root.

required
file_path str

Relative file path within the repo.

required
commit str

Git commit hash or ref to read from.

required
dialect str | None

Optional SQL dialect override.

None

Returns:

Type Description
ParseResult | None

A ParseResult, or None if the file doesn't exist at

ParseResult | None

that commit or is not a SQL file.

Source code in src/sqlprism/core/indexer.py
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
def parse_file_at_commit(
    self,
    repo_path: Path,
    file_path: str,
    commit: str,
    dialect: str | None = None,
) -> ParseResult | None:
    """Parse a file at a specific git commit.

    Retrieves file content via ``git show`` and parses it without
    writing to the database. Used by pr_impact analysis.

    Args:
        repo_path: Absolute path to the git repo root.
        file_path: Relative file path within the repo.
        commit: Git commit hash or ref to read from.
        dialect: Optional SQL dialect override.

    Returns:
        A ``ParseResult``, or ``None`` if the file doesn't exist at
        that commit or is not a SQL file.
    """
    if not is_sql_file(file_path):
        return None
    try:
        result = subprocess.run(
            ["git", "show", f"{commit}:{file_path}"],
            cwd=repo_path,
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode != 0:
            return None
    except (subprocess.TimeoutExpired, FileNotFoundError):
        return None

    return self.get_parser(dialect).parse(file_path, result.stdout)

get_changed_files

get_changed_files(repo_path, base_commit)

Get SQL files changed between a base commit and HEAD.

Parameters:

Name Type Description Default
repo_path Path

Absolute path to the git repo root.

required
base_commit str

Git commit hash or ref to diff against HEAD.

required

Returns:

Type Description
list[str]

List of relative file paths for changed SQL files. Returns

list[str]

an empty list on git errors or timeouts.

Source code in src/sqlprism/core/indexer.py
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
def get_changed_files(self, repo_path: Path, base_commit: str) -> list[str]:
    """Get SQL files changed between a base commit and HEAD.

    Args:
        repo_path: Absolute path to the git repo root.
        base_commit: Git commit hash or ref to diff against HEAD.

    Returns:
        List of relative file paths for changed SQL files. Returns
        an empty list on git errors or timeouts.
    """
    try:
        result = subprocess.run(
            ["git", "diff", "--name-only", f"{base_commit}..HEAD"],
            cwd=repo_path,
            capture_output=True,
            text=True,
            timeout=10,
        )
        if result.returncode != 0:
            return []
        return [f.strip() for f in result.stdout.strip().split("\n") if f.strip() and is_sql_file(f.strip())]
    except (subprocess.TimeoutExpired, FileNotFoundError):
        return []