Skip to content

Convention Engine

The convention inference engine detects project patterns from the indexed SQL graph — naming conventions, layer references, required columns, column style, and semantic tags.

ConventionEngine

ConventionEngine

ConventionEngine(db, repo_id)

Infers project conventions from the knowledge graph.

Takes a GraphDB instance and repo_id. Call detect_layers() and infer_naming_pattern() to analyse the graph.

Source code in src/sqlprism/core/conventions.py
106
107
108
def __init__(self, db: GraphDB, repo_id: int) -> None:
    self.db = db
    self.repo_id = repo_id

run_inference

run_inference(project_path=None)

Run all convention inference steps and store results.

Detects layers, infers naming patterns, reference rules, common columns, and column style for each layer. Upserts results into the conventions table. Then applies any YAML overrides from the project directory.

Parameters:

Name Type Description Default
project_path str | Path | None

Project directory for override file discovery. If None, no overrides are applied.

None

Returns:

Type Description
dict

Stats dict with layers_detected, conventions_stored, and

dict

overrides_applied counts.

Source code in src/sqlprism/core/conventions.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def run_inference(
    self, project_path: str | Path | None = None
) -> dict:
    """Run all convention inference steps and store results.

    Detects layers, infers naming patterns, reference rules,
    common columns, and column style for each layer. Upserts
    results into the ``conventions`` table. Then applies any
    YAML overrides from the project directory.

    Args:
        project_path: Project directory for override file discovery.
            If None, no overrides are applied.

    Returns:
        Stats dict with layers_detected, conventions_stored, and
        overrides_applied counts.
    """
    layers = self.detect_layers()
    if not layers:
        logger.debug("No layers detected for repo %d", self.repo_id)
        return {
            "layers_detected": 0,
            "conventions_stored": 0,
            "overrides_applied": 0,
        }

    reference_rules = self.infer_reference_rules(layers)
    ref_rules_by_layer = {r.source_layer: r for r in reference_rules}

    # Load overrides from YAML before acquiring the write lock
    # (file I/O should not hold the DB lock).
    overrides = self.load_overrides(project_path)

    stored = 0
    overrides_applied = 0
    # Single transaction for both inference and overrides —
    # concurrent readers never see partial state.
    with self.db.write_transaction():
        for layer in layers:
            # Step 2: Naming pattern
            naming = self.infer_naming_pattern(layer.model_names)
            self._store_convention(
                layer.name,
                "naming",
                {
                    "pattern": naming.pattern,
                    "matching_count": naming.matching_count,
                    "total_count": naming.total_count,
                    "exceptions": naming.exceptions,
                },
                naming.confidence,
                layer.model_count,
            )
            stored += 1

            # Step 3: Reference rules
            ref_rule = ref_rules_by_layer.get(layer.name)
            if ref_rule:
                self._store_convention(
                    layer.name,
                    "references",
                    {
                        "allowed_targets": ref_rule.allowed_targets,
                        "target_distribution": ref_rule.target_distribution,
                    },
                    ref_rule.confidence,
                    layer.model_count,
                )
                stored += 1

            # Step 4: Common columns
            common_cols = self.infer_common_columns(layer)
            if common_cols:
                self._store_convention(
                    layer.name,
                    "required_columns",
                    {
                        "columns": [
                            {
                                "name": c.column_name,
                                "frequency": c.frequency,
                                "source": c.source,
                                "missing_in": c.missing_in,
                            }
                            for c in common_cols
                        ]
                    },
                    min(max(c.frequency for c in common_cols), 1.0),
                    layer.model_count,
                )
                stored += 1

            # Step 5: Column style
            style = self.detect_column_style(layer)
            if style.confidence > 0.0:
                self._store_convention(
                    layer.name,
                    "column_style",
                    {"style": style.style},
                    min(style.confidence, 1.0),
                    layer.model_count,
                )
                stored += 1

        # Apply YAML overrides within the same transaction
        if overrides:
            overrides_applied = self._apply_overrides_inner(overrides)

    logger.info(
        "Convention inference: %d layers, %d conventions stored, "
        "%d overrides applied",
        len(layers),
        stored,
        overrides_applied,
    )
    return {
        "layers_detected": len(layers),
        "conventions_stored": stored,
        "overrides_applied": overrides_applied,
    }

generate_yaml

generate_yaml()

Generate a YAML conventions file with confidence scores as comments.

Reads from the conventions table and formats the output as a human-readable YAML file with inline confidence comments. Suitable for writing to sqlprism.conventions.yml.

Source code in src/sqlprism/core/conventions.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def generate_yaml(self) -> str:
    """Generate a YAML conventions file with confidence scores as comments.

    Reads from the ``conventions`` table and formats the output
    as a human-readable YAML file with inline confidence comments.
    Suitable for writing to ``sqlprism.conventions.yml``.
    """
    rows = self.db._execute_read(
        "SELECT layer, convention_type, payload, confidence, source "
        "FROM conventions WHERE repo_id = ? "
        "ORDER BY layer, convention_type",
        [self.repo_id],
    ).fetchall()

    if not rows:
        return "# No conventions found. Run 'sqlprism reindex' first.\n"

    # Group by layer
    layers: dict[str, dict[str, tuple]] = {}
    for layer, conv_type, payload, conf, source in rows:
        layers.setdefault(layer, {})[conv_type] = (payload, conf, source)

    lines = [
        "# Auto-generated by sqlprism conventions --init",
        "# Review and adjust. Override values to set confidence to 1.0.",
        "",
        "conventions:",
    ]

    for layer_name in sorted(layers):
        convs = layers[layer_name]
        lines.append(f"  {layer_name}:")

        # Naming
        if "naming" in convs:
            payload, conf, source = convs["naming"]
            parsed = json.loads(payload) if isinstance(payload, str) else payload
            pattern = parsed.get("pattern", "")
            exceptions = parsed.get("exceptions", [])
            exc_note = f", exceptions: {exceptions}" if exceptions else ""
            lines.append(
                f'    naming: "{pattern}"'
                f"  # confidence: {conf:.2f}{exc_note}"
            )

        # References
        if "references" in convs:
            payload, conf, source = convs["references"]
            parsed = json.loads(payload) if isinstance(payload, str) else payload
            targets = parsed.get("allowed_targets", [])
            lines.append(f"    allowed_refs:  # confidence: {conf:.2f}")
            for t in targets:
                lines.append(f'      - "{t}"')

        # Required columns
        if "required_columns" in convs:
            payload, conf, source = convs["required_columns"]
            parsed = json.loads(payload) if isinstance(payload, str) else payload
            columns = parsed.get("columns", [])
            lines.append(
                "    required_columns:"
                "  # frequency threshold: 0.70"
            )
            for col in columns:
                name = col.get("name", "")
                freq = col.get("frequency", 0)
                lines.append(f"      - {name}  # frequency: {freq:.2f}")

        # Column style
        if "column_style" in convs:
            payload, conf, source = convs["column_style"]
            parsed = json.loads(payload) if isinstance(payload, str) else payload
            style = parsed.get("style", "")
            lines.append(
                f'    column_style: "{style}"'
                f"  # confidence: {conf:.2f}"
            )

        lines.append("")

    return "\n".join(lines) + "\n"

get_diff

get_diff(yaml_path)

Compare current conventions against a YAML file.

Reports only actual differences: new/removed layers, changed naming patterns, changed references, etc.

Source code in src/sqlprism/core/conventions.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def get_diff(self, yaml_path: str | Path) -> str:
    """Compare current conventions against a YAML file.

    Reports only actual differences: new/removed layers,
    changed naming patterns, changed references, etc.
    """
    yaml_path = Path(yaml_path)
    if not yaml_path.is_file():
        return "No conventions YAML file found to compare against."

    try:
        existing = yaml.safe_load(yaml_path.read_text()) or {}
    except (yaml.YAMLError, OSError) as e:
        return f"Failed to read {yaml_path}: {e}"

    existing_convs = existing.get("conventions", {})

    # Get current conventions from DB
    rows = self.db._execute_read(
        "SELECT layer, convention_type, payload, confidence "
        "FROM conventions WHERE repo_id = ?",
        [self.repo_id],
    ).fetchall()

    current: dict[str, dict] = {}
    for layer, conv_type, payload, conf in rows:
        parsed = json.loads(payload) if isinstance(payload, str) else payload
        current.setdefault(layer, {})[conv_type] = {
            "payload": parsed,
            "confidence": conf,
        }

    changes: list[str] = []

    # New/removed layers
    current_layers = set(current)
    yaml_layers = set(existing_convs)
    for layer in sorted(current_layers - yaml_layers):
        changes.append(f"+ New layer: {layer}")
    for layer in sorted(yaml_layers - current_layers):
        changes.append(f"- Removed layer: {layer}")

    # Changed conventions in shared layers
    for layer in sorted(current_layers & yaml_layers):
        curr = current.get(layer, {})
        yaml_layer = existing_convs.get(layer, {})

        # Naming: compare DB pattern vs YAML naming string
        curr_naming = curr.get("naming", {}).get("payload", {}).get("pattern", "")
        yaml_naming = yaml_layer.get("naming", "")
        if curr_naming and not yaml_naming:
            changes.append(f"+ {layer}.naming: \"{curr_naming}\"")
        elif yaml_naming and not curr_naming:
            changes.append(f"- {layer}.naming: \"{yaml_naming}\"")
        elif curr_naming != yaml_naming:
            changes.append(
                f"  {layer}.naming: "
                f"\"{yaml_naming}\" -> \"{curr_naming}\""
            )

        # References: compare allowed_refs lists
        curr_refs = curr.get("references", {}).get("payload", {}).get("allowed_targets", [])
        yaml_refs = yaml_layer.get("allowed_refs", [])
        if sorted(curr_refs) != sorted(yaml_refs):
            if curr_refs and not yaml_refs:
                changes.append(f"+ {layer}.allowed_refs: {curr_refs}")
            elif yaml_refs and not curr_refs:
                changes.append(f"- {layer}.allowed_refs: {yaml_refs}")
            else:
                changes.append(
                    f"  {layer}.allowed_refs: {yaml_refs} -> {curr_refs}"
                )

        # Column style
        curr_style = curr.get("column_style", {}).get("payload", {}).get("style", "")
        yaml_style = yaml_layer.get("column_style", "")
        if curr_style != yaml_style and (curr_style or yaml_style):
            changes.append(
                f"  {layer}.column_style: "
                f"\"{yaml_style}\" -> \"{curr_style}\""
            )

    if not changes:
        return "No changes detected."
    return "\n".join(changes) + "\n"

load_overrides

load_overrides(project_path=None)

Load convention overrides from YAML file.

Discovery order: 1. sqlprism.conventions.yml in project_path 2. .sqlprism/sqlprism.conventions.yml in project_path

Returns parsed YAML dict, or None if no override file found.

Source code in src/sqlprism/core/conventions.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def load_overrides(
    self, project_path: str | Path | None = None
) -> dict | None:
    """Load convention overrides from YAML file.

    Discovery order:
    1. ``sqlprism.conventions.yml`` in project_path
    2. ``.sqlprism/sqlprism.conventions.yml`` in project_path

    Returns parsed YAML dict, or None if no override file found.
    """
    if project_path is None:
        return None

    project_path = Path(project_path)
    candidates = [
        project_path / "sqlprism.conventions.yml",
        project_path / ".sqlprism" / "sqlprism.conventions.yml",
    ]

    for path in candidates:
        if path.is_file():
            try:
                data = yaml.safe_load(path.read_text())
                if isinstance(data, dict):
                    logger.info("Loaded convention overrides from %s", path)
                    return data
            except (yaml.YAMLError, OSError) as e:
                logger.warning("Failed to load overrides from %s: %s", path, e)

    return None

apply_overrides

apply_overrides(overrides)

Apply explicit convention overrides to the conventions table.

Overrides replace inferred values entirely with confidence=1.0 and source='override'. Layers not in overrides keep their inferred values. Layers in overrides but not in inference are created.

Parameters:

Name Type Description Default
overrides dict

Parsed YAML dict with conventions and/or semantic_tags keys.

required

Returns:

Type Description
int

Number of override conventions stored.

Source code in src/sqlprism/core/conventions.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def apply_overrides(self, overrides: dict) -> int:
    """Apply explicit convention overrides to the conventions table.

    Overrides replace inferred values entirely with ``confidence=1.0``
    and ``source='override'``. Layers not in overrides keep their
    inferred values. Layers in overrides but not in inference are
    created.

    Args:
        overrides: Parsed YAML dict with ``conventions`` and/or
            ``semantic_tags`` keys.

    Returns:
        Number of override conventions stored.
    """
    with self.db.write_transaction():
        return self._apply_overrides_inner(overrides)

detect_layers

detect_layers()

Detect layers from directory structure.

Handles both flat (staging/, marts/) and nested (models/staging/). If all files share a common prefix dir, strips it and uses the next segment.

Source code in src/sqlprism/core/conventions.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
def detect_layers(self) -> list[Layer]:
    """Detect layers from directory structure.

    Handles both flat (staging/, marts/) and nested (models/staging/).
    If all files share a common prefix dir, strips it and uses the
    next segment.
    """
    rows = self.db._execute_read(
        """
        SELECT DISTINCT
            f.path,
            n.name
        FROM nodes n
        JOIN files f ON n.file_id = f.file_id
        WHERE n.kind IN ('table', 'view')
          AND f.repo_id = ?
        """,
        [self.repo_id],
    ).fetchall()

    if not rows:
        return []

    # Parse directory segments from file paths
    path_models: list[tuple[list[str], str]] = []
    for file_path, model_name in rows:
        parts = file_path.replace("\\", "/").split("/")
        # Remove the filename, keep directory segments
        dirs = parts[:-1]
        path_models.append((dirs, model_name))

    # Detect and strip common prefix directory
    dir_segments = [dirs for dirs, _ in path_models if dirs]
    if not dir_segments:
        return []

    prefix_len = self._common_prefix_length(dir_segments)

    # If prefix stripping would leave no layer segments for ALL models,
    # back off one level so the last prefix segment becomes the layer.
    all_at_prefix = all(
        len(dirs) <= prefix_len for dirs, _ in path_models if dirs
    )
    if all_at_prefix and prefix_len > 0:
        prefix_len -= 1

    # First pass: detect domain-nested structure by checking if
    # second-level dir names repeat across different first-level dirs.
    # E.g. finance/staging + marketing/staging → domain-nested.
    # But staging/by_source + staging/manual → NOT domain-nested.
    second_level_by_first: dict[str, set[str]] = {}
    for dirs, _ in path_models:
        remaining = dirs[prefix_len:]
        if len(remaining) >= 2:
            first, second = remaining[0], remaining[1]
            second_level_by_first.setdefault(first, set()).add(second)

    # Domain-nested if same second-level name appears under 2+ first-level dirs
    all_seconds: Counter[str] = Counter()
    for seconds in second_level_by_first.values():
        for s in seconds:
            all_seconds[s] += 1
    use_nested = any(count > 1 for count in all_seconds.values())

    # Group models by their layer directory
    layer_groups: dict[str, list[str]] = {}
    for dirs, model_name in path_models:
        if len(dirs) <= prefix_len:
            layer_key = ""
        else:
            remaining = dirs[prefix_len:]
            layer_key = remaining[0]

            # Use two-segment key only for domain-nested structures.
            # Note: models at depth 1 (e.g. finance/stg_flat.sql) keep
            # the flat key while depth 2+ get nested keys. This is
            # acceptable — mixed-depth within a domain dir is rare.
            if use_nested and len(remaining) > 1:
                layer_key = "/".join(remaining[:2])

        if layer_key:
            layer_groups.setdefault(layer_key, []).append(model_name)

    # Collapse domain-nested layers if sub-layer names repeat
    # (e.g. finance/staging + marketing/staging → staging)
    layer_groups = self._collapse_domain_layers(layer_groups)

    # Build Layer objects, skip groups with < 2 models
    layers = []
    for name, models in sorted(
        layer_groups.items(), key=lambda x: len(x[1]), reverse=True
    ):
        if len(models) < 2:
            continue
        confidence = self._layer_confidence(len(models))
        prefix = "/".join(dir_segments[0][:prefix_len]) if dir_segments else ""
        path_pat = f"{prefix}/{name}/**" if prefix else f"{name}/**"
        layers.append(
            Layer(
                name=name,
                path_pattern=path_pat,
                model_count=len(models),
                model_names=sorted(models),
                confidence=confidence,
            )
        )

    return layers

infer_naming_pattern

infer_naming_pattern(model_names)

Infer naming pattern from a list of model names.

Tokenizes by _, finds common prefixes, classifies variable segments, and builds a pattern template.

Source code in src/sqlprism/core/conventions.py
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
def infer_naming_pattern(
    self, model_names: list[str]
) -> NamingPattern:
    """Infer naming pattern from a list of model names.

    Tokenizes by ``_``, finds common prefixes, classifies variable
    segments, and builds a pattern template.
    """
    if not model_names:
        return NamingPattern(
            pattern="",
            confidence=0.0,
            matching_count=0,
            total_count=0,
        )

    total = len(model_names)

    # Tokenize all names
    tokenized = [name.split("_") for name in model_names]

    # Find common prefix tokens
    prefix_tokens = self._find_common_prefix_tokens(tokenized)
    prefix = "_".join(prefix_tokens) + "_" if prefix_tokens else ""

    # Strip prefix and analyse remaining tokens
    stripped = []
    matching_prefix = 0
    exceptions = []
    for name, tokens in zip(model_names, tokenized):
        if prefix_tokens and tokens[: len(prefix_tokens)] == prefix_tokens:
            stripped.append(tokens[len(prefix_tokens) :])
            matching_prefix += 1
        else:
            stripped.append(tokens)
            exceptions.append(name)

    # Classify variable segments by position
    if prefix_tokens:
        # With prefix: analyse segment structure after prefix
        segment_counts = Counter(len(s) for s in stripped if s)
        if segment_counts:
            most_common_len = segment_counts.most_common(1)[0][0]
        else:
            most_common_len = 0

        # Build pattern from prefix + variable segments
        var_labels = self._classify_segments(stripped, most_common_len)
        pattern = prefix + "_".join(f"{{{v}}}" for v in var_labels)
        confidence = matching_prefix / total if total > 0 else 0.0
    else:
        # No common prefix — try to find structural patterns
        segment_counts = Counter(len(tokens) for tokens in tokenized)
        if segment_counts:
            most_common_len = segment_counts.most_common(1)[0][0]
        else:
            most_common_len = 0

        var_labels = self._classify_segments(tokenized, most_common_len)
        pattern = "_".join(f"{{{v}}}" for v in var_labels)
        # Lower confidence without a clear prefix
        matching_structure = sum(
            1 for tokens in tokenized if len(tokens) == most_common_len
        )
        confidence = (
            matching_structure / total * 0.7 if total > 0 else 0.0
        )
        # Models not matching the dominant structure are exceptions
        exceptions = [
            name
            for name, tokens in zip(model_names, tokenized)
            if len(tokens) != most_common_len
        ]

    # Cap confidence for small samples
    if total < 5:
        confidence = min(confidence, 0.6)

    return NamingPattern(
        pattern=pattern,
        confidence=round(confidence, 2),
        matching_count=total - len(exceptions),
        total_count=total,
        exceptions=sorted(exceptions),
    )

infer_reference_rules

infer_reference_rules(layers)

Infer layer-to-layer reference rules from edges.

For each source layer, computes what percentage of references go to each target layer. High concentration → high confidence.

Source code in src/sqlprism/core/conventions.py
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
def infer_reference_rules(
    self, layers: list[Layer]
) -> list[ReferenceRule]:
    """Infer layer-to-layer reference rules from edges.

    For each source layer, computes what percentage of references
    go to each target layer. High concentration → high confidence.
    """
    if not layers:
        return []

    # Build model → layer mapping
    model_layer = {}
    for layer in layers:
        for model in layer.model_names:
            model_layer[model] = layer.name

    # Query edges between table/view nodes in this repo.
    # Scope both source and target by repo_id (target may be a
    # phantom node with file_id IS NULL — include those too).
    rows = self.db._execute_read(
        """
        SELECT
            sn.name AS source_name,
            tn.name AS target_name
        FROM edges e
        JOIN nodes sn ON e.source_id = sn.node_id
        JOIN nodes tn ON e.target_id = tn.node_id
        JOIN files sf ON sn.file_id = sf.file_id
        LEFT JOIN files tf ON tn.file_id = tf.file_id
        WHERE sf.repo_id = ?
          AND (tf.repo_id = ? OR tn.file_id IS NULL)
          AND e.relationship IN ('references', 'cte_references')
        """,
        [self.repo_id, self.repo_id],
    ).fetchall()

    # Count edges per (source_layer, target_layer)
    edge_counts: dict[str, Counter[str]] = {}
    for src_name, tgt_name in rows:
        src_layer = model_layer.get(src_name)
        tgt_layer = model_layer.get(tgt_name)
        if src_layer is None:
            continue
        if tgt_layer is None:
            # Target not in any known layer — skip
            continue
        if src_layer == tgt_layer:
            # Skip within-layer references (e.g. CTE self-refs)
            continue
        edge_counts.setdefault(src_layer, Counter())[tgt_layer] += 1

    rules = []
    for layer in layers:
        counts = edge_counts.get(layer.name)
        if not counts:
            continue
        total = sum(counts.values())
        if total == 0:
            continue

        distribution = {
            tgt: round(count / total, 2)
            for tgt, count in counts.most_common()
        }
        # Dominant target = highest count
        _top_target, top_count = counts.most_common(1)[0]
        confidence = round(top_count / total, 2)

        allowed = [
            tgt for tgt, pct in distribution.items() if pct >= 0.1
        ]

        rules.append(
            ReferenceRule(
                source_layer=layer.name,
                allowed_targets=allowed,
                target_distribution=distribution,
                confidence=confidence,
            )
        )

    return rules

infer_common_columns

infer_common_columns(layer, threshold=0.7)

Detect columns appearing in >=threshold fraction of models.

Merges two sources: columns table (definitions, more authoritative) and column_usage table (usage in SELECT).

Source code in src/sqlprism/core/conventions.py
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
def infer_common_columns(
    self,
    layer: Layer,
    threshold: float = 0.7,
) -> list[RequiredColumn]:
    """Detect columns appearing in >=threshold fraction of models.

    Merges two sources: ``columns`` table (definitions, more
    authoritative) and ``column_usage`` table (usage in SELECT).
    """
    if not layer.model_names:
        return []
    model_count = len(layer.model_names)
    if model_count == 0:
        return []

    # Columns from definitions (columns table)
    def_rows = self.db._execute_read(
        """
        SELECT
            c.column_name,
            COUNT(DISTINCT c.node_id) AS usage_count
        FROM columns c
        JOIN nodes n ON c.node_id = n.node_id
        JOIN files f ON n.file_id = f.file_id
        WHERE f.repo_id = ?
          AND n.name IN ({placeholders})
        GROUP BY c.column_name
        """.format(
            placeholders=",".join(["?"] * len(layer.model_names))
        ),
        [self.repo_id, *layer.model_names],
    ).fetchall()

    def_freq: dict[str, float] = {
        col: min(count / model_count, 1.0) for col, count in def_rows
    }

    # Columns from usage (column_usage table, SELECT only)
    usage_rows = self.db._execute_read(
        """
        SELECT
            cu.column_name,
            COUNT(DISTINCT cu.node_id) AS usage_count
        FROM column_usage cu
        JOIN nodes n ON cu.node_id = n.node_id
        JOIN files f ON n.file_id = f.file_id
        WHERE f.repo_id = ?
          AND n.name IN ({placeholders})
          AND cu.usage_type = 'select'
        GROUP BY cu.column_name
        """.format(
            placeholders=",".join(["?"] * len(layer.model_names))
        ),
        [self.repo_id, *layer.model_names],
    ).fetchall()

    usage_freq: dict[str, float] = {
        col: min(count / model_count, 1.0) for col, count in usage_rows
    }

    # Merge: definition is more authoritative
    all_cols = set(def_freq) | set(usage_freq)
    results = []
    for col in sorted(all_cols):
        d_freq = def_freq.get(col, 0.0)
        u_freq = usage_freq.get(col, 0.0)
        # Use the higher frequency, prefer definition source
        freq = max(d_freq, u_freq)
        if freq < threshold:
            continue

        if col in def_freq and col in usage_freq:
            source = "both"
        elif col in def_freq:
            source = "definition"
        else:
            source = "usage"

        # Find models missing this column
        missing = self._find_models_missing_column(
            layer.model_names, col
        )

        results.append(
            RequiredColumn(
                column_name=col,
                frequency=round(freq, 2),
                source=source,
                missing_in=missing,
            )
        )

    return results

detect_column_style

detect_column_style(layer)

Classify dominant column naming convention in a layer.

Checks: snake_case, camelCase, PascalCase, SCREAMING_SNAKE.

Source code in src/sqlprism/core/conventions.py
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
def detect_column_style(
    self, layer: Layer
) -> ColumnStyle:
    """Classify dominant column naming convention in a layer.

    Checks: snake_case, camelCase, PascalCase, SCREAMING_SNAKE.
    """
    if not layer.model_names:
        return ColumnStyle(style="snake_case", confidence=0.0)

    # Get column names from models in this layer
    rows = self.db._execute_read(
        """
        SELECT DISTINCT c.column_name
        FROM columns c
        JOIN nodes n ON c.node_id = n.node_id
        JOIN files f ON n.file_id = f.file_id
        WHERE f.repo_id = ?
          AND n.name IN ({placeholders})
        """.format(
            placeholders=",".join(["?"] * len(layer.model_names))
        ),
        [self.repo_id, *layer.model_names],
    ).fetchall()

    column_names = [r[0] for r in rows]
    if not column_names:
        # Fall back to column_usage
        usage_rows = self.db._execute_read(
            """
            SELECT DISTINCT cu.column_name
            FROM column_usage cu
            JOIN nodes n ON cu.node_id = n.node_id
            JOIN files f ON n.file_id = f.file_id
            WHERE f.repo_id = ?
              AND n.name IN ({placeholders})
            """.format(
                placeholders=",".join(
                    ["?"] * len(layer.model_names)
                )
            ),
            [self.repo_id, *layer.model_names],
        ).fetchall()
        column_names = [r[0] for r in usage_rows]

    return self._classify_column_style(column_names)

infer_semantic_tags

infer_semantic_tags(threshold=0.5, existing_tags=None)

Cluster models by shared upstream references and auto-label.

Pipeline: 1. Query upstream refs per model. 2. Agglomerative clustering by Jaccard similarity. 3. Auto-label each cluster from most frequent name token. 4. Score per-model confidence.

Parameters:

Name Type Description Default
threshold float

Jaccard similarity threshold for merging clusters.

0.5
existing_tags list[TagAssignment] | None

Previously assigned tags for stability check.

None

Returns:

Type Description
list[TagAssignment]

List of TagAssignment objects, one per model in a cluster.

Source code in src/sqlprism/core/conventions.py
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
def infer_semantic_tags(
    self,
    threshold: float = 0.5,
    existing_tags: list[TagAssignment] | None = None,
) -> list[TagAssignment]:
    """Cluster models by shared upstream references and auto-label.

    Pipeline:
    1. Query upstream refs per model.
    2. Agglomerative clustering by Jaccard similarity.
    3. Auto-label each cluster from most frequent name token.
    4. Score per-model confidence.

    Args:
        threshold: Jaccard similarity threshold for merging clusters.
        existing_tags: Previously assigned tags for stability check.

    Returns:
        List of ``TagAssignment`` objects, one per model in a cluster.
    """
    ref_sets = self._get_model_references()
    if len(ref_sets) < 5:
        logger.debug(
            "Skipping semantic tags: repo %d has < 5 models with refs",
            self.repo_id,
        )
        return []

    clusters = self._agglomerative_cluster(ref_sets, threshold)

    # Build existing tag lookup for stability
    existing_by_node: dict[int, TagAssignment] = {}
    if existing_tags:
        for tag in existing_tags:
            existing_by_node[tag.node_id] = tag

    assignments: list[TagAssignment] = []
    for cluster in clusters:
        tag_name, label_confidence = self._label_cluster(cluster)
        if not tag_name:
            continue

        for node_id, model_name in cluster.members:
            confidence = self._compute_member_confidence(
                node_id, model_name, tag_name, cluster,
            )

            # Stability: keep existing tag if still above threshold
            prev = existing_by_node.get(node_id)
            if prev and prev.tag_name != tag_name:
                # Check if the model still fits the old cluster
                old_still_valid = self._check_tag_stability(
                    node_id, prev.tag_name, clusters, ref_sets, threshold,
                )
                if old_still_valid:
                    assignments.append(prev)
                    continue

            assignments.append(
                TagAssignment(
                    tag_name=tag_name,
                    node_id=node_id,
                    model_name=model_name,
                    confidence=round(min(confidence, 1.0), 2),
                    source="inferred",
                )
            )

    return assignments

Data Classes

Layer dataclass

Layer(
    name,
    path_pattern,
    model_count,
    model_names=list(),
    confidence=0.6,
)

A detected project layer (e.g. staging, intermediate, marts).

NamingPattern dataclass

NamingPattern(
    pattern,
    confidence,
    matching_count,
    total_count,
    exceptions=list(),
)

An inferred naming convention for a layer.

ReferenceRule dataclass

ReferenceRule(
    source_layer,
    allowed_targets,
    target_distribution,
    confidence,
)

Inferred reference rule: which layers a source layer references.

RequiredColumn dataclass

RequiredColumn(
    column_name, frequency, source, missing_in=list()
)

A column that appears frequently in a layer.

ColumnStyle dataclass

ColumnStyle(style, confidence)

Dominant column naming convention for a layer.

TagAssignment dataclass

TagAssignment(
    tag_name, node_id, model_name, confidence, source
)

A semantic tag assigned to a model via structural clustering.

Cluster dataclass

Cluster(members, ref_sets)

A group of models with similar upstream references.