Skip to content

MCP Server

FastMCP tool definitions — the interface LLMs interact with. Tools are provider-agnostic; any MCP client can connect via stdio or streamable HTTP.

MCP server exposing SQL indexer tools.

This is the interface LLMs interact with. Tools are provider-agnostic — any MCP client (Claude, Cursor, Continue.dev, etc.) can connect via stdio or streamable HTTP.

Focused entirely on SQL: tables, views, CTEs, column lineage, transforms, WHERE filters, and dependency tracing across dialects.

configure

configure(db_path, repos, sql_dialect=None)

Initialise the graph and indexer with repo configuration.

Parameters:

Name Type Description Default
db_path str | Path

Path to DuckDB file

required
repos dict

{repo_name: path_or_config} — value is either a string path or a dict with "path", "dialect", "dialect_overrides" keys

required
sql_dialect str | None

Global fallback SQL dialect (overridden by per-repo config)

None

Thread-safety: builds a new immutable _ServerState and swaps it in with a single assignment, so concurrent readers never see partial updates.

Source code in src/sqlprism/core/mcp_tools.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def configure(db_path: str | Path, repos: dict, sql_dialect: str | None = None):
    """Initialise the graph and indexer with repo configuration.

    Args:
        db_path: Path to DuckDB file
        repos: {repo_name: path_or_config} — value is either a string path
               or a dict with "path", "dialect", "dialect_overrides" keys
        sql_dialect: Global fallback SQL dialect (overridden by per-repo config)

    Thread-safety: builds a new immutable ``_ServerState`` and swaps it in
    with a single assignment, so concurrent readers never see partial updates.
    """
    global _state
    graph = GraphDB(db_path)
    indexer = Indexer(graph)
    config = {
        "db_path": str(db_path),
        "repos": repos,
        "sql_dialect": sql_dialect,
    }

    # Register repos before publishing new state
    for name, cfg in repos.items():
        path = cfg["path"] if isinstance(cfg, dict) else cfg
        repo_type = cfg.get("repo_type", "sql") if isinstance(cfg, dict) else "sql"
        graph.upsert_repo(name, path, repo_type=repo_type)

    # Atomic swap — readers always get a consistent triple
    _state = _ServerState(graph=graph, indexer=indexer, config=config)

search async

search(params)

Search for SQL entities by name across the codebase graph.

Finds tables, views, CTEs, and queries by partial name match. Returns matches with name, kind, file path, repo, and line numbers.

Source code in src/sqlprism/core/mcp_tools.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@mcp.tool(
    name="search",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def search(params: SearchInput) -> dict:
    """Search for SQL entities by name across the codebase graph.

    Finds tables, views, CTEs, and queries by partial name match.
    Returns matches with name, kind, file path, repo, and line numbers.
    """
    return await asyncio.to_thread(
        _get_graph().query_search,
        pattern=params.pattern,
        kind=params.kind,
        schema=params.sql_schema,
        repo=params.repo,
        limit=params.limit,
        offset=params.offset,
        include_snippets=params.include_snippets,
    )

find_references async

find_references(params)

Find everything connected to a named SQL entity.

Returns both inbound (what depends on this) and outbound (what this depends on) relationships. Each result includes: name, kind, relationship type, file path, repo.

Source code in src/sqlprism/core/mcp_tools.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@mcp.tool(
    name="find_references",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_references(params: FindReferencesInput) -> dict:
    """Find everything connected to a named SQL entity.

    Returns both inbound (what depends on this) and outbound (what this depends on)
    relationships. Each result includes: name, kind, relationship type, file path, repo.
    """
    return await asyncio.to_thread(
        _get_graph().query_references,
        name=params.name,
        kind=params.kind,
        schema=params.sql_schema,
        repo=params.repo,
        direction=params.direction,
        include_snippets=params.include_snippets,
        limit=params.limit,
        offset=params.offset,
    )

find_column_usage async

find_column_usage(params)

Find where and how columns are used across SQL models.

Powered by sqlglot's column lineage analysis. Shows usage type, transforms (CAST, COALESCE, etc.), output aliases, and WHERE conditions.

Answers: "where is customer_id used in WHERE clauses?", "how is animal.breed_id transformed?", "show all column usage on orders."

Source code in src/sqlprism/core/mcp_tools.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@mcp.tool(
    name="find_column_usage",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_column_usage(params: FindColumnUsageInput) -> dict:
    """Find where and how columns are used across SQL models.

    Powered by sqlglot's column lineage analysis. Shows usage type,
    transforms (CAST, COALESCE, etc.), output aliases, and WHERE conditions.

    Answers: "where is customer_id used in WHERE clauses?",
    "how is animal.breed_id transformed?", "show all column usage on orders."
    """
    return await asyncio.to_thread(
        _get_graph().query_column_usage,
        table=params.table,
        column=params.column,
        usage_type=params.usage_type,
        repo=params.repo,
        limit=params.limit,
        offset=params.offset,
    )

trace_dependencies async

trace_dependencies(params)

Trace multi-hop dependency chains through the SQL graph.

Follows table → view → CTE → query chains. Use for impact analysis: "if I change this table, what models break?"

Source code in src/sqlprism/core/mcp_tools.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@mcp.tool(
    name="trace_dependencies",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def trace_dependencies(params: TraceDependenciesInput) -> dict:
    """Trace multi-hop dependency chains through the SQL graph.

    Follows table → view → CTE → query chains. Use for impact analysis:
    "if I change this table, what models break?"
    """
    return await asyncio.to_thread(
        _get_graph().query_trace,
        name=params.name,
        kind=params.kind,
        direction=params.direction,
        max_depth=params.max_depth,
        repo=params.repo,
        include_snippets=params.include_snippets,
        limit=params.limit,
    )

trace_column_lineage async

trace_column_lineage(params)

Trace end-to-end column lineage through CTEs and subqueries.

Shows how an output column traces back to source table columns, with each intermediate hop (CTE, subquery) and any transforms (CAST, etc.).

Answers: "where does dim_users.created_date come from?", "which output columns depend on orders.amount?"

Note: SELECT * lineage requires a schema catalog built from prior column usage data. On a fresh index, SELECT * columns may not be expanded. Run a second full reindex to populate the catalog and resolve them.

Source code in src/sqlprism/core/mcp_tools.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
@mcp.tool(
    name="trace_column_lineage",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def trace_column_lineage(params: TraceColumnLineageInput) -> dict:
    """Trace end-to-end column lineage through CTEs and subqueries.

    Shows how an output column traces back to source table columns, with
    each intermediate hop (CTE, subquery) and any transforms (CAST, etc.).

    Answers: "where does dim_users.created_date come from?",
    "which output columns depend on orders.amount?"

    Note: SELECT * lineage requires a schema catalog built from prior column
    usage data. On a fresh index, SELECT * columns may not be expanded.
    Run a second full reindex to populate the catalog and resolve them.
    """
    return await asyncio.to_thread(
        _get_graph().query_column_lineage,
        table=params.table,
        column=params.column,
        output_node=params.output_node,
        repo=params.repo,
        limit=params.limit,
        offset=params.offset,
    )

get_schema async

get_schema(params)

Get the schema of a table or model — columns, types, descriptions, and dependencies.

Returns column definitions (name, type, position, source, description), upstream dependencies (what this model reads from), and downstream dependencies (what reads from this model). The primary tool for understanding table structure.

Source code in src/sqlprism/core/mcp_tools.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@mcp.tool(
    name="get_schema",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def get_schema(params: GetSchemaInput) -> dict:
    """Get the schema of a table or model — columns, types, descriptions, and dependencies.

    Returns column definitions (name, type, position, source, description),
    upstream dependencies (what this model reads from), and downstream
    dependencies (what reads from this model). The primary tool for
    understanding table structure.
    """
    return await asyncio.to_thread(
        _get_graph().query_schema,
        name=params.name,
        repo=params.repo,
    )

get_context async

get_context(params)

Get comprehensive context for a model — the first tool to call when working with a model.

Returns a complete context dump including: - Model metadata (name, kind, file, repo) - Column definitions with types and descriptions - Upstream and downstream dependencies - Column usage summary (most used columns, join keys, aggregations) - Source code snippet (first 30 lines) - Graph metrics (PageRank importance) when DuckPGQ is available

Source code in src/sqlprism/core/mcp_tools.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
@mcp.tool(
    name="get_context",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def get_context(params: GetContextInput) -> dict:
    """Get comprehensive context for a model — the first tool to call when working with a model.

    Returns a complete context dump including:
    - Model metadata (name, kind, file, repo)
    - Column definitions with types and descriptions
    - Upstream and downstream dependencies
    - Column usage summary (most used columns, join keys, aggregations)
    - Source code snippet (first 30 lines)
    - Graph metrics (PageRank importance) when DuckPGQ is available
    """
    return await asyncio.to_thread(
        _get_graph().query_context,
        name=params.name,
        repo=params.repo,
    )

find_path async

find_path(params)

Find the shortest dependency path between two models.

Uses DuckPGQ graph traversal to find the shortest chain of dependencies connecting two models. Returns the full path with intermediate models and path length.

Requires DuckPGQ extension. Returns an error if not installed.

Source code in src/sqlprism/core/mcp_tools.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
@mcp.tool(
    name="find_path",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_path(params: FindPathInput) -> dict:
    """Find the shortest dependency path between two models.

    Uses DuckPGQ graph traversal to find the shortest chain of
    dependencies connecting two models. Returns the full path
    with intermediate models and path length.

    Requires DuckPGQ extension. Returns an error if not installed.
    """
    return await asyncio.to_thread(
        _get_graph().query_find_path,
        from_model=params.from_model,
        to_model=params.to_model,
        max_hops=params.max_hops,
    )

find_critical_models async

find_critical_models(params)

Find the most critical models by importance (PageRank) and downstream impact.

Ranks models by their graph centrality — models with high PageRank are referenced by many important models. Use to identify high-impact models that need extra care when modifying.

Requires DuckPGQ extension.

Source code in src/sqlprism/core/mcp_tools.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
@mcp.tool(
    name="find_critical_models",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_critical_models(params: FindCriticalModelsInput) -> dict:
    """Find the most critical models by importance (PageRank) and downstream impact.

    Ranks models by their graph centrality — models with high PageRank are
    referenced by many important models. Use to identify high-impact models
    that need extra care when modifying.

    Requires DuckPGQ extension.
    """
    return await asyncio.to_thread(
        _get_graph().query_find_critical_models,
        top_n=params.top_n,
        repo=params.repo,
    )

detect_cycles async

detect_cycles(params)

Detect circular dependencies in the SQL dependency graph.

Finds cycles where models form dependency loops (A -> B -> C -> A). Uses recursive CTE traversal — no DuckPGQ extension required.

Source code in src/sqlprism/core/mcp_tools.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
@mcp.tool(
    name="detect_cycles",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def detect_cycles(params: DetectCyclesInput) -> dict:
    """Detect circular dependencies in the SQL dependency graph.

    Finds cycles where models form dependency loops (A -> B -> C -> A).
    Uses recursive CTE traversal — no DuckPGQ extension required.
    """
    return await asyncio.to_thread(
        _get_graph().query_detect_cycles,
        repo=params.repo,
        max_cycle_length=params.max_cycle_length,
    )

find_subgraphs async

find_subgraphs(params)

Identify weakly connected components (subgraphs) in the dependency graph.

Reveals isolated model clusters, orphaned models, and overall graph topology. Requires DuckPGQ extension.

Source code in src/sqlprism/core/mcp_tools.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
@mcp.tool(
    name="find_subgraphs",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_subgraphs(params: FindSubgraphsInput) -> dict:
    """Identify weakly connected components (subgraphs) in the dependency graph.

    Reveals isolated model clusters, orphaned models, and overall graph topology.
    Requires DuckPGQ extension.
    """
    return await asyncio.to_thread(
        _get_graph().query_find_subgraphs,
        repo=params.repo,
    )

find_bottlenecks async

find_bottlenecks(params)

Find bottleneck models with high fan-in/out that are single points of failure.

Combines edge counting (plain SQL) with optional DuckPGQ clustering coefficient. Models with high downstream count and low clustering are flagged as high risk.

Source code in src/sqlprism/core/mcp_tools.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
@mcp.tool(
    name="find_bottlenecks",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_bottlenecks(params: FindBottlenecksInput) -> dict:
    """Find bottleneck models with high fan-in/out that are single points of failure.

    Combines edge counting (plain SQL) with optional DuckPGQ clustering coefficient.
    Models with high downstream count and low clustering are flagged as high risk.
    """
    return await asyncio.to_thread(
        _get_graph().query_find_bottlenecks,
        min_downstream=params.min_downstream,
        repo=params.repo,
    )

check_impact async

check_impact(params)

Check the downstream impact of proposed column changes BEFORE modifying code.

Analyzes column usage across downstream models to classify each change as: - breaking: SELECT/JOIN usage — downstream model will error - warning: WHERE/GROUP BY usage — filter breaks but model may not error - safe: column not referenced downstream

Call this BEFORE removing, renaming, or adding columns to understand the blast radius.

Note: add_column does not detect SELECT * usage — downstream models using wildcard selects may still be affected by new columns.

Source code in src/sqlprism/core/mcp_tools.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
@mcp.tool(
    name="check_impact",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def check_impact(params: CheckImpactInput) -> dict:
    """Check the downstream impact of proposed column changes BEFORE modifying code.

    Analyzes column usage across downstream models to classify each change as:
    - **breaking**: SELECT/JOIN usage — downstream model will error
    - **warning**: WHERE/GROUP BY usage — filter breaks but model may not error
    - **safe**: column not referenced downstream

    Call this BEFORE removing, renaming, or adding columns to understand the blast radius.

    Note: ``add_column`` does not detect ``SELECT *`` usage — downstream models
    using wildcard selects may still be affected by new columns.
    """
    return await asyncio.to_thread(
        _get_graph().query_check_impact,
        model=params.model,
        changes=[c.model_dump() for c in params.changes],
        repo=params.repo,
    )

pr_impact async

pr_impact(params)

Analyse the structural impact of SQL changes since a base commit.

Computes structural diff (added/removed/modified tables, views, CTEs, column usage) then traces the blast radius through the full index.

Delta mode caveat: compare_mode="delta" shows net-new downstream impact by approximating the base blast radius via edge exclusion on the HEAD graph. It does not detect reduced blast radius from removed edges — no_longer_affected will be empty when a PR only removes dependencies. Use compare_mode="absolute" for a full picture when edge removals are the primary change.

Source code in src/sqlprism/core/mcp_tools.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
@mcp.tool(
    name="pr_impact",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=False,
        openWorldHint=False,
    ),
)
async def pr_impact(params: PrImpactInput) -> dict:
    """Analyse the structural impact of SQL changes since a base commit.

    Computes structural diff (added/removed/modified tables, views, CTEs,
    column usage) then traces the blast radius through the full index.

    **Delta mode caveat:** ``compare_mode="delta"`` shows **net-new downstream
    impact** by approximating the base blast radius via edge exclusion on the
    HEAD graph.  It does **not** detect reduced blast radius from removed edges
    — ``no_longer_affected`` will be empty when a PR only removes dependencies.
    Use ``compare_mode="absolute"`` for a full picture when edge removals are
    the primary change.
    """
    state = _get_state()
    indexer = state.indexer
    graph = state.graph
    config = state.config

    # Determine which repo
    if params.repo:
        path, dialect, _dialect_overrides = _resolve_repo_config(params.repo)
        repo_path = Path(path)
    elif len(config["repos"]) == 1:
        repo_name = next(iter(config["repos"].keys()))
        path, dialect, _dialect_overrides = _resolve_repo_config(repo_name)
        repo_path = Path(path)
    else:
        return {"error": "Multiple repos configured — specify which repo to analyse."}

    def _blocking_pr_impact() -> dict:
        changed_files = indexer.get_changed_files(repo_path, params.base_commit)
        if not changed_files:
            return {"files_changed": [], "structural_diff": {}, "blast_radius": {}}

        old_results: dict[str, ParseResult] = {}
        new_results: dict[str, ParseResult] = {}

        for file_path in changed_files:
            full_path = repo_path / file_path
            if full_path.exists() and is_sql_file(file_path):
                content = full_path.read_text(errors="replace")
                new_results[file_path] = indexer.parse_file(file_path, content, dialect)

            old = indexer.parse_file_at_commit(repo_path, file_path, params.base_commit, dialect)
            if old:
                old_results[file_path] = old

        diff = _compute_structural_diff(old_results, new_results)

        affected_node_names = (
            [n["name"] for n in diff["nodes_added"]]
            + [n["name"] for n in diff["nodes_removed"]]
            + [n["name"] for n in diff["nodes_modified"]]
        )

        # Names of truly new nodes (no base trace needed for these)
        added_names = {n["name"] for n in diff["nodes_added"]}

        # Build exclude set: edges added in HEAD that did not exist at base
        edges_added_set: set[tuple[str, str]] = {(e["source"], e["target"]) for e in diff.get("edges_added", [])}

        is_delta = params.compare_mode == "delta"

        blast_radius: dict = {}
        if affected_node_names:
            head_affected: set[tuple[str, str]] = set()
            base_affected: set[tuple[str, str]] = set()
            all_head_paths: list[dict] = []  # flat list for repo counting
            repos_hit: set[str] = set()
            truncated = len(affected_node_names) > 20

            affected_node_names.sort()
            for node_name in affected_node_names[:20]:
                # HEAD blast radius (current graph)
                head_trace = graph.query_trace(
                    name=node_name,
                    direction="downstream",
                    max_depth=params.max_blast_radius_depth,
                )
                head_paths = head_trace.get("paths", [])
                head_affected.update((p["name"], p["kind"]) for p in head_paths)
                all_head_paths.extend(head_paths)
                repos_hit.update(head_trace.get("repos_affected", []))

                # Base blast radius approximation (exclude new edges)
                if is_delta and node_name not in added_names:
                    base_trace = graph.query_trace(
                        name=node_name,
                        direction="downstream",
                        max_depth=params.max_blast_radius_depth,
                        exclude_edges=edges_added_set,
                    )
                    base_affected.update((p["name"], p["kind"]) for p in base_trace.get("paths", []))

            if is_delta:
                newly_affected = head_affected - base_affected
                no_longer_affected = base_affected - head_affected

                blast_radius = {
                    "compare_mode": "delta",
                    "head_total": len(head_affected),
                    "base_total": len(base_affected),
                    "delta": len(head_affected) - len(base_affected),
                    "newly_affected": [{"name": n, "kind": k} for n, k in sorted(newly_affected)],
                    "no_longer_affected": [{"name": n, "kind": k} for n, k in sorted(no_longer_affected)],
                    "unchanged_affected": len(head_affected & base_affected),
                    "note": (
                        "Delta mode approximates the base blast radius by "
                        "excluding newly-added edges from the HEAD graph. "
                        "It shows net-new downstream impact but does not "
                        "detect reduced blast radius from removed edges."
                    ),
                    # Backward-compat fields
                    "transitively_affected": len(head_affected),
                    "repos_affected": sorted(repos_hit),
                    "truncated": truncated,
                    "total_affected_nodes": len(affected_node_names),
                }
            else:
                # Absolute mode (v1 behavior)
                blast_radius = {
                    "compare_mode": "absolute",
                    "transitively_affected": len(all_head_paths),
                    "affected_by_repo": {r: sum(1 for a in all_head_paths if a.get("repo") == r) for r in repos_hit},
                    "repos_affected": sorted(repos_hit),
                    "truncated": truncated,
                    "total_affected_nodes": len(affected_node_names),
                }

            if truncated:
                blast_radius["truncation_message"] = (
                    f"Blast radius incomplete — {len(affected_node_names)} affected nodes, "
                    "only first 20 traced. Use trace_dependencies "
                    "on specific nodes for full picture."
                )

        return {
            "files_changed": changed_files,
            "structural_diff": diff,
            "blast_radius": blast_radius,
        }

    return await asyncio.to_thread(_blocking_pr_impact)

get_conventions async

get_conventions(params)

Get naming conventions, reference rules, and required columns for a layer.

Returns inferred conventions with confidence scores. Agents should follow high-confidence conventions (>0.9) and ask about low-confidence ones (<0.7).

Use this before writing new models to understand project patterns: naming conventions, allowed layer references, required columns, and column naming style.

Source code in src/sqlprism/core/mcp_tools.py
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
@mcp.tool(
    name="get_conventions",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def get_conventions(params: GetConventionsInput) -> dict:
    """Get naming conventions, reference rules, and required columns for a layer.

    Returns inferred conventions with confidence scores. Agents should follow
    high-confidence conventions (>0.9) and ask about low-confidence ones (<0.7).

    Use this before writing new models to understand project patterns:
    naming conventions, allowed layer references, required columns, and
    column naming style.
    """
    return await asyncio.to_thread(
        _get_graph().query_conventions,
        layer=params.layer,
        repo=params.repo,
    )

search_by_tag async

search_by_tag(params)

Find models tagged with a business domain concept, ranked by confidence.

Returns models whose semantic tags match the given tag name, ordered by confidence score (highest first). Use list_tags first to discover the available tags in the project's business domain vocabulary.

Source code in src/sqlprism/core/mcp_tools.py
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
@mcp.tool(
    name="search_by_tag",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def search_by_tag(params: SearchByTagInput) -> dict:
    """Find models tagged with a business domain concept, ranked by confidence.

    Returns models whose semantic tags match the given tag name, ordered by
    confidence score (highest first). Use list_tags first to discover the
    available tags in the project's business domain vocabulary.
    """
    return await asyncio.to_thread(
        _get_graph().query_search_by_tag,
        tag=params.tag,
        repo=params.repo,
        min_confidence=params.min_confidence,
    )

list_tags async

list_tags(params)

Return all semantic tags with model counts and average confidence.

Provides the project's business domain vocabulary — the set of conceptual tags that have been assigned to models. Use this to discover available tags before calling search_by_tag.

Source code in src/sqlprism/core/mcp_tools.py
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
@mcp.tool(
    name="list_tags",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def list_tags(params: ListTagsInput) -> dict:
    """Return all semantic tags with model counts and average confidence.

    Provides the project's business domain vocabulary — the set of conceptual
    tags that have been assigned to models. Use this to discover available tags
    before calling search_by_tag.
    """
    return await asyncio.to_thread(
        _get_graph().query_list_tags,
        repo=params.repo,
    )

find_similar_models async

find_similar_models(params)

Find existing models similar to what you're building.

Compares reference overlap, column overlap, and layer placement to find models that already do something similar. Helps avoid duplicate work and suggests models to extend rather than recreate.

Source code in src/sqlprism/core/mcp_tools.py
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
@mcp.tool(
    name="find_similar_models",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def find_similar_models(params: FindSimilarModelsInput) -> dict:
    """Find existing models similar to what you're building.

    Compares reference overlap, column overlap, and layer placement to find
    models that already do something similar. Helps avoid duplicate work and
    suggests models to extend rather than recreate.
    """
    return await asyncio.to_thread(
        _get_graph().query_find_similar_models,
        references=params.references,
        output_columns=params.output_columns,
        model=params.model,
        limit=params.limit,
        repo=params.repo,
    )

suggest_placement async

suggest_placement(params)

Suggest where to place a new model based on its references.

Uses inferred layer flow rules and naming conventions to recommend the right layer, directory, and model name. Returns similar existing models to help avoid duplicate work.

Source code in src/sqlprism/core/mcp_tools.py
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
@mcp.tool(
    name="suggest_placement",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def suggest_placement(params: SuggestPlacementInput) -> dict:
    """Suggest where to place a new model based on its references.

    Uses inferred layer flow rules and naming conventions to recommend the
    right layer, directory, and model name. Returns similar existing models
    to help avoid duplicate work.
    """
    return await asyncio.to_thread(
        _get_graph().query_suggest_placement,
        references=params.references,
        name=params.name,
        repo=params.repo,
    )

reindex async

reindex(params)

Trigger a reindex of SQL files. Checksums and re-parses only what changed.

Runs in the background so queries remain available during reindex. Supports per-repo SQL dialects and path-based dialect overrides.

Source code in src/sqlprism/core/mcp_tools.py
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
@mcp.tool(
    name="reindex",
    annotations=ToolAnnotations(
        readOnlyHint=False,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def reindex(params: ReindexInput) -> dict:
    """Trigger a reindex of SQL files. Checksums and re-parses only what changed.

    Runs in the background so queries remain available during reindex.
    Supports per-repo SQL dialects and path-based dialect overrides.
    """
    global _reindex_task, _reindex_status

    async with _reindex_lock:
        # If already running, return status
        if _reindex_task and not _reindex_task.done():
            return {"status": "in_progress", **_reindex_status}

        state = _get_state()
        indexer = state.indexer

        repos = state.config["repos"]
        if params.repo:
            if params.repo not in repos:
                return {"error": f"Repo '{params.repo}' not found in config"}
            repos = {params.repo: repos[params.repo]}

        repo_names = list(repos.keys())
        _reindex_status = {
            "state": "started",
            "started_at": datetime.now().isoformat(),
            "repos": repo_names,
        }

        async def _background_reindex():
            global _reindex_status
            try:

                def _blocking():
                    global _reindex_status
                    results = {}
                    for name, _cfg in repos.items():
                        _reindex_status = {**_reindex_status, "current_repo": name}
                        path, dialect, dialect_overrides = _resolve_repo_config(name)
                        results[name] = indexer.reindex_repo(
                            name,
                            path,
                            dialect=dialect,
                            dialect_overrides=dialect_overrides,
                        )
                    return results

                result = await asyncio.to_thread(_blocking)
                global _last_parse_errors
                all_errors = []
                for repo_result in result.values():
                    all_errors.extend(repo_result.get("parse_errors", []))
                _last_parse_errors = all_errors
                _reindex_status = {
                    **_reindex_status,
                    "state": "completed",
                    "completed_at": datetime.now().isoformat(),
                    "result": result,
                }
                return result
            except Exception as e:
                _reindex_status = {
                    **_reindex_status,
                    "state": "failed",
                    "error": str(e),
                    "failed_at": datetime.now().isoformat(),
                }

        _reindex_task = asyncio.create_task(_background_reindex())

    return {
        "status": "started",
        "message": ("Reindex running in background. Queries remain available. Call index_status to check progress."),
        "repos": repo_names,
    }

reindex_sqlmesh async

reindex_sqlmesh(params)

Index a sqlmesh project by rendering all models into clean SQL.

Runs in the background so queries remain available during reindex. Uses sqlmesh's rendering engine to expand macros and resolve variables, then parses with sqlglot to extract tables, CTEs, edges, column lineage.

Source code in src/sqlprism/core/mcp_tools.py
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
@mcp.tool(
    name="reindex_sqlmesh",
    annotations=ToolAnnotations(
        readOnlyHint=False,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def reindex_sqlmesh(params: ReindexSqlmeshInput) -> dict:
    """Index a sqlmesh project by rendering all models into clean SQL.

    Runs in the background so queries remain available during reindex.
    Uses sqlmesh's rendering engine to expand macros and resolve variables,
    then parses with sqlglot to extract tables, CTEs, edges, column lineage.
    """
    global _reindex_task, _reindex_status

    async with _reindex_lock:
        # If already running, return status
        if _reindex_task and not _reindex_task.done():
            return {"status": "in_progress", **_reindex_status}

        indexer = _get_indexer()

        var_dict: dict[str, str | int] = {}
        if params.variables:
            for k, v in params.variables.items():
                try:
                    var_dict[k] = int(v)
                except ValueError:
                    var_dict[k] = v

        _reindex_status = {
            "state": "started",
            "started_at": datetime.now().isoformat(),
            "repos": [params.name],
            "tool": "reindex_sqlmesh",
        }

        async def _background_reindex():
            global _reindex_status
            try:
                result = await asyncio.to_thread(
                    indexer.reindex_sqlmesh,
                    repo_name=params.name,
                    project_path=params.project_path,
                    env_file=params.env_file,
                    variables=var_dict,
                    dialect=params.dialect,
                    sqlmesh_command=params.sqlmesh_command,
                )
                global _last_parse_errors
                if isinstance(result, dict):
                    _last_parse_errors = result.get("parse_errors", [])
                _reindex_status = {
                    **_reindex_status,
                    "state": "completed",
                    "completed_at": datetime.now().isoformat(),
                    "result": result,
                }
                return result
            except Exception as e:
                _reindex_status = {
                    **_reindex_status,
                    "state": "failed",
                    "error": str(e),
                    "failed_at": datetime.now().isoformat(),
                }

        _reindex_task = asyncio.create_task(_background_reindex())

    return {
        "status": "started",
        "message": (
            "SQLMesh reindex running in background. Queries remain available. Call index_status to check progress."
        ),
        "repos": [params.name],
    }

reindex_dbt async

reindex_dbt(params)

Index a dbt project by compiling all models into clean SQL.

Runs in the background so queries remain available during reindex. Runs dbt compile, then parses with sqlglot to extract tables, CTEs, edges, column lineage with transforms.

Source code in src/sqlprism/core/mcp_tools.py
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
@mcp.tool(
    name="reindex_dbt",
    annotations=ToolAnnotations(
        readOnlyHint=False,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def reindex_dbt(params: ReindexDbtInput) -> dict:
    """Index a dbt project by compiling all models into clean SQL.

    Runs in the background so queries remain available during reindex.
    Runs `dbt compile`, then parses with sqlglot to extract tables, CTEs,
    edges, column lineage with transforms.
    """
    global _reindex_task, _reindex_status

    async with _reindex_lock:
        # If already running, return status
        if _reindex_task and not _reindex_task.done():
            return {"status": "in_progress", **_reindex_status}

        indexer = _get_indexer()

        _reindex_status = {
            "state": "started",
            "started_at": datetime.now().isoformat(),
            "repos": [params.name],
            "tool": "reindex_dbt",
        }

        async def _background_reindex():
            global _reindex_status
            try:
                result = await asyncio.to_thread(
                    indexer.reindex_dbt,
                    repo_name=params.name,
                    project_path=params.project_path,
                    profiles_dir=params.profiles_dir,
                    env_file=params.env_file,
                    target=params.target,
                    dbt_command=params.dbt_command,
                    dialect=params.dialect,
                )
                global _last_parse_errors
                if isinstance(result, dict):
                    _last_parse_errors = result.get("parse_errors", [])
                _reindex_status = {
                    **_reindex_status,
                    "state": "completed",
                    "completed_at": datetime.now().isoformat(),
                    "result": result,
                }
                return result
            except Exception as e:
                _reindex_status = {
                    **_reindex_status,
                    "state": "failed",
                    "error": str(e),
                    "failed_at": datetime.now().isoformat(),
                }

        _reindex_task = asyncio.create_task(_background_reindex())

    return {
        "status": "started",
        "message": (
            "dbt reindex running in background. Queries remain available. Call index_status to check progress."
        ),
        "repos": [params.name],
    }

reindex_files async

reindex_files(params)

Reindex specific files after save. Non-blocking.

Fast path for on-save reindex. Accepts absolute file paths, resolves to repos, and reindexes only the affected models.

  • Plain SQL files: reindexed in ~50ms
  • dbt/sqlmesh models: compiled + reindexed in ~2-5s

Multiple rapid calls are debounced per repo. Returns immediately; reindex runs in background.

Source code in src/sqlprism/core/mcp_tools.py
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
@mcp.tool(
    name="reindex_files",
    annotations=ToolAnnotations(
        readOnlyHint=False,
        destructiveHint=False,
        idempotentHint=False,
        openWorldHint=False,
    ),
)
async def reindex_files(params: ReindexFilesInput) -> dict:
    """Reindex specific files after save. Non-blocking.

    Fast path for on-save reindex. Accepts absolute file paths,
    resolves to repos, and reindexes only the affected models.

    - Plain SQL files: reindexed in ~50ms
    - dbt/sqlmesh models: compiled + reindexed in ~2-5s

    Multiple rapid calls are debounced per repo. Returns immediately;
    reindex runs in background.
    """
    state = _state
    if not state:
        return {"error": "Server not configured. Call configure() first."}

    sql_files = [p for p in params.paths if is_sql_file(p)]
    if not sql_files:
        return {"accepted": 0, "skipped": len(params.paths), "reason": "No SQL files in paths"}

    # Resolve files to repos and group by (repo_name, repo_type)
    all_repos = state.indexer.graph.get_all_repos()
    enqueued = 0
    skipped = 0
    grouped: dict[tuple[str, str], list[str]] = defaultdict(list)
    for path in sql_files:
        resolved = state.indexer._resolve_file_repo(Path(path).resolve(), all_repos)
        if resolved:
            _repo_id, repo_name, _repo_path, repo_type = resolved
            grouped[(repo_name, repo_type)].append(path)
            enqueued += 1
        else:
            skipped += 1

    for (repo_name, repo_type), paths in grouped.items():
        await _enqueue_reindex(repo_name, repo_type, paths)

    non_sql_skipped = len(params.paths) - len(sql_files)
    result: dict = {
        "accepted": enqueued,
        "skipped": skipped + non_sql_skipped,
        "queued_at": datetime.now().isoformat(),
    }
    if enqueued > 0:
        result["note"] = "Reindex queued. Check index_status for progress."
    else:
        result["reason"] = "No SQL files matched a configured repo"
    return result

index_status async

index_status()

Current state of the index — repos, file counts, last commit, staleness.

Source code in src/sqlprism/core/mcp_tools.py
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
@mcp.tool(
    name="index_status",
    annotations=ToolAnnotations(
        readOnlyHint=True,
        destructiveHint=False,
        idempotentHint=True,
        openWorldHint=False,
    ),
)
async def index_status() -> dict:
    """Current state of the index — repos, file counts, last commit, staleness."""
    status = await asyncio.to_thread(_get_graph().get_index_status)
    if _reindex_task and not _reindex_task.done():
        status["reindex_in_progress"] = True
        status["reindex_status"] = _reindex_status
    elif _reindex_status.get("state") in ("completed", "failed"):
        status["last_reindex"] = _reindex_status
    status["parse_error_count"] = len(_last_parse_errors)
    if _last_parse_errors:
        status["last_parse_errors"] = _last_parse_errors[:50]  # cap at 50
    return status