Summit Builder Keynote Debut

Broadcast live on June 23

Snowflake for Developers/Guides/Snowflake Cortex Inference, Cortex + Ibis: AI Enrichment, Vector Search & Distribution Analysis
Quickstart

Snowflake Cortex Inference, Cortex + Ibis: AI Enrichment, Vector Search & Distribution Analysis

Cortex LLM
Priya Joseph

<<<<<<< HEAD

Snowflake Cortex Inference, Cortex + Ibis: AI Enrichment, Vector Search & Distribution Analysis

A hands-on guide to Snowflake Cortex AI functions via Ibis, covering the full pipeline from LLM enrichment and vector embeddings to TurboPuffer hybrid search, Shannon entropy distribution analysis, and installing reusable Cortex Code (CoCo) skills. Also includes direct Cortex Inference access with PAT/JWT auth, streaming SSE, and tool calling.

Snowflake Cortex Inference, Cortex + Ibis: AI Enrichment, TurboPuffer Vector Search & Distribution Analysis

A hands-on guide to Snowflake Cortex AI functions via Ibis, covering the full pipeline from LLM enrichment and vector embeddings to TurboPuffer hybrid search, Shannon entropy distribution analysis, and installing reusable Cortex Code (CoCo) skills. Also includes direct Cortex Inference, REST API access with PAT/JWT auth, streaming SSE, and tool calling.

45edc0888 (removed pycache)

All examples use the cortex_rest.py client included in this workspace.


Architecture

Python client (httpx)
  ├── Auth: PAT  → Authorization: Bearer <token>
  │               X-Snowflake-Authorization-Token-Type: PROGRAMMATIC_ACCESS_TOKEN
  └── Auth: JWT  → Authorization: Bearer <signed_jwt>
                   X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT
       ↓
POST https://<account>.snowflakecomputing.com/api/v2/cortex/inference:complete
       ↓
  Snowflake Cortex (claude-4-sonnet, llama3.3-70b, mistral-large2, ...)

Prerequisites

pip install httpx PyJWT cryptography rich
  • Snowflake account with Cortex enabled.
  • PAT (Programmatic Access Token) in ~/.snowflake/config.toml, or an RSA key pair registered on your user.

All runnable code lives under assets/. Run scripts from that directory:

cd assets
python distribution_demo.py
python test_cortex_rest.py

Section 0 — Auth Check

Verifies that the PAT loads correctly from ~/.snowflake/config.toml and shows the endpoint that will be called.

from cortex_rest import _load_pat

host, token = _load_pat()           # reads connections.myaccount.password
print(host)                          # e.g. myorg-myaccount.snowflakecomputing.com
print(token[:8] + "...")             # eyJraWQi...

Section 1 — Simple Single-Turn Complete

Non-streaming completion with token usage stats.

from cortex_rest import CortexInferenceClient

client = CortexInferenceClient()     # auto PAT from config.toml
resp = client.complete(
    "claude-4-sonnet",
    [{"role": "user", "content": "In one sentence: what is Snowflake Cortex?"}],
    max_tokens=150,
)
print(resp["choices"][0]["message"]["content"])
print(resp["usage"])   # {'prompt_tokens': 21, 'completion_tokens': 51, 'total_tokens': 72}

Screenshot:

Simple complete

Section 2 — Multi-Turn Conversation

Pass the full conversation history; Cortex maintains context.

messages = [
    {"role": "user",      "content": "My name is Ada. What's 12 × 12?"},
    {"role": "assistant", "content": "12 × 12 = 144."},
    {"role": "user",      "content": "What's my name and what was the answer?"},
]
resp = client.complete("claude-4-sonnet", messages, max_tokens=120)
# → "Your name is Ada, and the answer to 12 × 12 was 144."

Screenshot:

Multi-turn

Section 3 — Streaming SSE Response

The endpoint returns Server-Sent Events. complete_stream() parses the data: lines and yields token chunks incrementally.

for chunk in client.complete_stream(
    "claude-4-sonnet",
    [{"role": "user", "content": "Count slowly from 1 to 5, one number per line."}],
    max_tokens=80,
):
    delta = chunk["choices"][0]["delta"]
    text  = delta.get("content") or delta.get("text", "")
    print(text, end="", flush=True)

# Raw SSE shape:
# data: {"id":"...", "model":"claude-4-sonnet",
#        "choices":[{"delta":{"type":"text","content":"1\n","text":"1\n"}}], "usage":{}}
# data: {"id":"...", ..., "usage":{"prompt_tokens":15,"completion_tokens":20,"total_tokens":35}}

Screenshot:

Streaming

Section 4 — Tool Calling (Function Calling)

Snowflake uses tool_spec format. Here the model is asked to draft a customer-service reply to a defective-product review — but first it must call get_product_details(product_id) to look up the product's metadata. This mirrors the product_id field in the cortex_ibis reviews dataset (P001–P005).

tools = [{
    "tool_spec": {
        "type": "generic",
        "name": "get_product_details",
        "description": "Look up product metadata (name, category, price) by product_id from the product catalog.",
        "input_schema": {
            "type": "object",
            "properties": {
                "product_id": {"type": "string", "description": "Product identifier, e.g. P001, P002."},
            },
            "required": ["product_id"],
        },
    }
}]

# Review from the turbopuffer_demo dataset — product P003, defective unit
messages = [{
    "role": "user",
    "content": (
        "A customer left this review for product P003: "
        "'Defective unit out of the box. USB port doesn't work at all.' "
        "Before drafting a response, look up the product details for P003."
    ),
}]

resp = client.complete("claude-4-sonnet", messages, tools=tools, max_tokens=250)

# Model responds with a tool_use call:
# [{"type": "tool_use", "tool_use": {"name": "get_product_details", "input": {"product_id": "P003"}}}]

Screenshot:

Tool calling

Section 5 — Temperature & Sampling Parameters

claude-4-sonnet accepts temperature, top_p, top_k. Note: these parameters were removed for claude-opus-4-7 and newer (any non-default value returns 400).

# Works for claude-4-sonnet:
resp = client.complete(
    "claude-4-sonnet",
    [{"role": "user", "content": "ping"}],
    max_tokens=20,
    temperature=0.7,
)

# For Opus 4.7 — strip the param before sending:
#   payload.pop("temperature", None)   # in pre_call_hook.py

Screenshot:

Temperature

Section 6 — Error Handling

A bad model name returns HTTP 400 with a JSON error body. httpx raises HTTPStatusError; catch it to extract the message.

import httpx
from cortex_rest import CortexInferenceClient

client = CortexInferenceClient()
try:
    client.complete("this-model-does-not-exist", [{"role":"user","content":"ping"}])
except httpx.HTTPStatusError as exc:
    print(exc.response.status_code)    # 400
    print(exc.response.json())         # {"message": "unknown model \"this-model-does-not-exist\"", ...}

Screenshot:

Error handling

JWT Key-Pair Auth

To use key-pair JWT instead of a PAT (e.g. in CI or service accounts):

# 1. Generate RSA key pair (once)
#    openssl genrsa -out snowflake_rsa_key.p8 2048
#    openssl rsa -in snowflake_rsa_key.p8 -pubout -out snowflake_rsa_key.pub
#
# 2. Register the public key on your Snowflake user:
#    ALTER USER <user> SET RSA_PUBLIC_KEY='<contents of .pub>';
#
# 3. Use JWT auth:
client = CortexInferenceClient(
    auth="jwt",
    account="myorg-myaccount",
    user="[email protected]",
    private_key_path="~/.ssh/snowflake_rsa_key.p8",
)
resp = client.complete("claude-4-sonnet", [{"role": "user", "content": "ping"}])

The _build_jwt() function in cortex_rest.py signs a short-lived JWT (default 60 min) using PyJWT + RSA, computing the public key fingerprint as SHA256:<base64>.


Module-Level Convenience Functions

For quick scripts that don't need the full client:

from cortex_rest import complete, stream

# One-shot
print(complete("What is 2+2?"))

# Streaming (yields text chunks)
for chunk in stream("Explain vector search in two sentences."):
    print(chunk, end="", flush=True)

Distribution Analysis — Shannon Entropy

Shannon entropy measures how diverse a product's review categories are. A product with all defect reviews has low entropy (focused root cause). A product with mixed billing/delivery/defect/positive reviews has high entropy (needs broad support coverage).

Synthetic dataset entropy profiles:

ProductProfileH (bits)Norm H
P004All positive0.000.00
P00290% product defect0.450.23
P00790% billing0.800.40
P005Bimodal delivery+defect0.970.49
P008Trimodal billing/delivery/defect1.160.58
P003Bimodal billing+delivery1.820.91
P001Uniform (all 4 categories)1.840.92
P006Slight positive skew1.880.94

Screenshot:

Shannon entropy distribution analysis

File Layout

cortex-inference-ibis-integration-skills/
├── cortex-inference-ibis-integration-skills.md   ← this guide
└── assets/
    ├── SKILL.md                  ← Cortex Code skill entry point (routing table)
    ├── cortex_ibis.py            ← Ibis UDFs for all Cortex AI functions (SQL path)
    ├── cortex_rest.py            ← Direct REST client — PAT/JWT, streaming, tool calling
    ├── demo.py                   ← End-to-end Ibis enrichment walkthrough
    ├── distribution_demo.py      ← Shannon entropy & category distribution analysis
    ├── synthetic_data.py         ← 300-row synthetic review dataset (8 products, Dirichlet)
    ├── turbopuffer_demo.py       ← Cortex + Ibis + TurboPuffer pipeline
    ├── test_cortex_rest.py       ← 7-section validation suite + SVG screenshot capture
    ├── __init__.py
    ├── requirements.txt
    ├── s1_simple_complete.svg
    ├── s2_multi_turn.svg
    ├── s3_streaming.svg
    ├── s4_tool_calling.svg
    ├── s5_temperature.svg
    ├── s6_error_handling.svg
    └── s_entropy.svg             ← Shannon entropy bar chart output

Cortex Code Skill

The assets/SKILL.md + the reference sections in this file also ship as a Cortex Code (CoCo) skill — install it once and any CoCo session will auto-invoke it for Cortex + Ibis questions.

# Install the skill into Cortex Code
cortex skill add /path/to/assets

# Verify
cortex skill list | grep cortex-ibis

The skill routes to the right reference section based on intent:

Ask aboutRoutes to
AI_*, SNOWFLAKE.CORTEX.*, .mutate()Cortex + Ibis API Reference
EnrichmentPipeline, fluent chainEnrichmentPipeline Reference
Embeddings, EMBED_TEXT_768/1024Embeddings Reference
Semantic search, vector similaritySemantic Search Reference
REST API, CortexInferenceClient, PAT/JWT, streamingCortex REST API Reference
TurboPuffer, ANN, BM25, hybrid searchTurboPuffer Integration Reference

Troubleshooting

SymptomCauseFix
401 UnauthorizedPAT expired or wrong tokenRegenerate PAT in Snowsight
400 — temperature is deprecatedModel is Opus 4.7+Remove temperature/top_p/top_k from payload
400 — unknown modelModel name typo or unavailable in regionCheck CURRENT_REGION() and use claude-4-sonnet
Tunnel connection failed: 403Running inside sandboxed envUse dangerously_disable_sandbox=True or run outside
KEYPAIR_JWT 401Wrong account/user in JWT issuerMatch CURRENT_ACCOUNT() / CURRENT_USER()

Cortex + Ibis API Reference

All functions in cortex_ibis.py. Use these before writing custom SQL.

AI_* Functions (new unprefixed — preferred)

from cortex_ibis import (
    ai_complete, ai_sentiment, ai_translate,
    ai_classify, ai_extract, ai_filter, ai_redact,
    ai_summarize_agg, ai_agg,          # aggregates
)

# Scalar
table.mutate(sentiment=ai_sentiment(table.body))
table.mutate(translated=ai_translate(table.body, "en", "es"))
table.mutate(reply=ai_complete("claude-4-sonnet", "Reply to: " + table.body))
table.filter(ai_filter("Is this a complaint? " + table.body))

# Aggregate (use inside .agg())
table.group_by("product_id").agg(summary=ai_summarize_agg(table.body))
table.group_by("product_id").agg(
    top_issue=ai_agg("What is the main complaint?", table.body)
)

SNOWFLAKE.CORTEX.* Functions (classic namespaced)

from cortex_ibis import (
    cortex_complete, cortex_summarize, cortex_sentiment,
    cortex_translate, cortex_extract_answer,
)

# cortex_sentiment returns float in [-1, 1]
table.mutate(score=cortex_sentiment(table.body))

# cortex_extract_answer returns VARIANT {answer, score}
raw = cortex_extract_answer(table.body, "What product is reviewed?")
table.mutate(answer=variant_str(raw, "answer"), conf=variant_float(raw, "score"))

VARIANT Helpers

from cortex_ibis import variant_str, variant_float, variant_int, unpack_classify

# Unpack AI_CLASSIFY → {label, score}
cls = ai_classify(table.body, ["billing", "delivery", "product quality"])
table.mutate(
    category=variant_str(cls, "label"),
    score=variant_float(cls, "score"),
)

# Shorthand
unpacked = unpack_classify(cls)   # {'label': StringColumn, 'score': FloatingColumn}

High-Level Helpers

from cortex_ibis import add_sentiment, add_summary, add_classification, add_extraction, add_embeddings

table = add_sentiment(table, "body")                                      # → float 'sentiment'
table = add_summary(table, "body")                                        # → str 'summary'
table = add_classification(table, "body", ["billing", "delivery"])        # → 'category', 'category_score'
table = add_extraction(table, "body", {"order_id": {"type": "string"}})   # → VARIANT 'extracted'
table = add_embeddings(table, "body", model="snowflake-arctic-embed-m-v1.5", dims=768)  # → VECTOR 'embedding'

SQL Preview (always do this before .execute())

import ibis
print(ibis.to_sql(table_expr, dialect="snowflake"))

EnrichmentPipeline Reference

Fluent builder for composing Cortex enrichment steps. Nothing runs until .execute() or .cache().

from cortex_ibis import EnrichmentPipeline

result = (
    EnrichmentPipeline(con.table("CUSTOMER_REVIEWS"))
    .filter_ai("Is this written in English? ", "body")       # drops non-English rows
    .classify("body", ["billing", "delivery", "product quality", "support"])
    .sentiment("body")                                         # float score column
    .summarize("body")                                         # abstractive summary
    .embed("body", model="snowflake-arctic-embed-m-v1.5", dims=768)
    .translate("body", "en", "es", out="body_es")
    .complete("body", "Write a brief customer-service reply to: ", model="claude-4-sonnet")
    .execute()                                                 # → pandas DataFrame
)

Available Chain Methods

MethodOutput column(s)Notes
.classify(col, categories)category, category_scoreAI_CLASSIFY + VARIANT unpack
.sentiment(col)sentimentSNOWFLAKE.CORTEX.SENTIMENT float
.summarize(col)summarySNOWFLAKE.CORTEX.SUMMARIZE
.embed(col, model, dims)embeddingEMBED_TEXT_768 or EMBED_TEXT_1024
.translate(col, src, tgt)translatedSNOWFLAKE.CORTEX.TRANSLATE
.complete(col, prefix, model)completionAI_COMPLETE
.filter_ai(condition, col)— (filters rows)AI_FILTER

Materialise to Snowflake Table

# Returns an Ibis table expression pointing at the new table
enriched = (
    EnrichmentPipeline(reviews)
    .classify("body", ["billing", "support"])
    .sentiment("body")
).cache(con, "REVIEWS_ENRICHED", overwrite=True)

Inspect SQL Without Running

pipeline = EnrichmentPipeline(reviews).sentiment("body").summarize("body")
print(pipeline.sql())    # compiled Snowflake SQL

Pattern: Filter First, Enrich Only Relevant Rows

# Cheap vector pre-filter → expensive LLM only on matched subset
from cortex_ibis import embed_768, vector_cosine_similarity
import ibis

query_vec = embed_768("snowflake-arctic-embed-m-v1.5", ibis.literal("refund request"))
relevant = (
    embed_tbl
    .mutate(sim=vector_cosine_similarity(embed_tbl.embedding, query_vec))
    .filter(ibis._.sim > 0.75)
)
# Now enrich only ~relevant rows (much cheaper than enriching everything)
result = EnrichmentPipeline(relevant).classify("body", ["billing"]).execute()

Embeddings Reference

Two embedding functions available as Ibis built-in UDFs.

Functions

from cortex_ibis import embed_768, embed_1024

# Returns VECTOR(FLOAT, 768) — annotated as Array(float32) for Ibis compatibility
vec_col = embed_768("snowflake-arctic-embed-m-v1.5", table.body)

# Returns VECTOR(FLOAT, 1024)
vec_col = embed_1024("snowflake-arctic-embed-l-v2", table.body)

add_embeddings Helper

from cortex_ibis import add_embeddings

table = add_embeddings(
    table, "body",
    model="snowflake-arctic-embed-m-v1.5",
    dims=768,
    out="embedding",
)

cache_embeddings — Pre-compute Once, Query Many Times

from cortex_ibis import cache_embeddings

embed_tbl = cache_embeddings(
    con,
    source_table="CUSTOMER_REVIEWS",
    text_col="body",
    dest_table="CUSTOMER_REVIEWS_EMBEDDINGS",
    id_cols=["id", "product_id"],
    model="snowflake-arctic-embed-m-v1.5",
    dims=768,
    overwrite=True,
)

On-the-fly Query Embedding (runs inside Snowflake)

import ibis
from cortex_ibis import embed_768

query_vec = embed_768("snowflake-arctic-embed-m-v1.5", ibis.literal("your query text"))
# Compiled to: SNOWFLAKE.CORTEX.EMBED_TEXT_768('model', 'your query text')

Vector Similarity Functions

from cortex_ibis import vector_cosine_similarity, vector_l2_distance, vector_inner_product

# Cosine: higher = more similar (range [-1, 1])
sim = vector_cosine_similarity(table.embedding, query_vec)

# L2: lower = more similar
dist = vector_l2_distance(table.embedding, query_vec)

# Inner product: for normalised vectors ≡ cosine
dot = vector_inner_product(table.embedding, query_vec)

Ibis Type Note

Snowflake returns VECTOR(FLOAT, N) which has no direct Ibis type. The functions annotate the return as Array(float32) so Ibis accepts the expression — the emitted SQL is valid Snowflake.


Semantic Search Reference

Uses SNOWFLAKE.CORTEX.EMBED_TEXT_768/1024 + VECTOR_COSINE_SIMILARITY / L2 / INNER_PRODUCT. Query embedding is computed inside Snowflake — no Python-side API call needed.

semantic_search() Helper

from cortex_ibis import semantic_search

results = semantic_search(
    embed_tbl,                                 # table with pre-computed 'embedding' column
    text_col="body",
    query="delayed shipment and missing item",
    top_k=10,
    metric="cosine",                           # "cosine" | "l2" | "inner_product"
    model="snowflake-arctic-embed-m-v1.5",
    dims=768,
    id_cols=["id", "product_id", "body"],      # columns to include in result
)
# Returns: id | product_id | body | similarity, ordered by similarity DESC
from cortex_ibis import cache_embeddings

embed_tbl = cache_embeddings(
    con,
    source_table="CUSTOMER_REVIEWS",
    text_col="body",
    dest_table="CUSTOMER_REVIEWS_EMBEDDINGS",
    id_cols=["id", "product_id"],
    model="snowflake-arctic-embed-m-v1.5",
    dims=768,
    overwrite=True,
)
# Created: CUSTOMER_REVIEWS_EMBEDDINGS (id, product_id, body, embedding VECTOR(FLOAT,768))

Manual Similarity with Threshold

from cortex_ibis import embed_768, vector_cosine_similarity
import ibis

query_vec = embed_768("snowflake-arctic-embed-m-v1.5", ibis.literal("broken product"))

results = (
    embed_tbl
    .mutate(sim=vector_cosine_similarity(embed_tbl.embedding, query_vec))
    .filter(ibis._.sim > 0.7)                   # threshold
    .select("id", "product_id", "body", "sim")
    .order_by(ibis.desc("sim"))
    .limit(20)
)

Supported Models

ModelDimsUse for
snowflake-arctic-embed-m-v1.5768General semantic search (default)
snowflake-arctic-embed-l-v21024Higher accuracy, slower
e5-base-v2768Alternative general-purpose
nv-embed-qa-41024Q&A / retrieval tasks

Cortex REST API Reference

Direct HTTP client in cortex_rest.py. Use when you need streaming, tool calling, or want to bypass the SQL connector.

PAT Auth (default — auto-loaded from config.toml)

from cortex_rest import CortexInferenceClient

client = CortexInferenceClient()   # reads ~/.snowflake/config.toml → connections.myaccount.password

# Headers sent:
# Authorization: Bearer <token>
# X-Snowflake-Authorization-Token-Type: PROGRAMMATIC_ACCESS_TOKEN

JWT Key-Pair Auth

client = CortexInferenceClient(
    auth="jwt",
    account="myorg-myaccount",
    user="[email protected]",
    private_key_path="~/.ssh/snowflake_rsa_key.p8",
)
# Headers sent:
# Authorization: Bearer <signed_jwt>
# X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT

Simple Complete

resp = client.complete(
    "claude-4-sonnet",
    [{"role": "user", "content": "Summarise this review in one line."}],
    max_tokens=100,
)
text = resp["choices"][0]["message"]["content"]
usage = resp["usage"]   # {prompt_tokens, completion_tokens, total_tokens}

Streaming SSE

for event in client.complete_stream("claude-4-sonnet", messages, max_tokens=500):
    delta = event["choices"][0]["delta"]
    chunk = delta.get("content") or delta.get("text", "")
    print(chunk, end="", flush=True)
# Last event has: event["usage"]["total_tokens"]

Tool Calling (Snowflake tool_spec format)

tools = [{
    "tool_spec": {
        "type": "generic",
        "name": "get_product_details",
        "description": "Look up product metadata by product_id.",
        "input_schema": {
            "type": "object",
            "properties": {"product_id": {"type": "string"}},
            "required": ["product_id"],
        },
    }
}]
resp = client.complete("claude-4-sonnet", messages, tools=tools, max_tokens=250)
# Tool call in response:
content_list = resp["choices"][0]["message"]["content_list"]
tool_calls = [c for c in content_list if c.get("type") == "tool_use"]
# → [{"type": "tool_use", "tool_use": {"name": "get_product_details", "input": {"product_id": "P003"}}}]

Sampling Parameters — Important

Modeltemperature / top_p / top_k
claude-4-sonnet, llama3.3-70b, etc.Accepted
claude-opus-4-7 and newer OpusRemoved — returns 400 on any non-default value

Strip before sending for Opus 4.7+:

for k in ("temperature", "top_p", "top_k"):
    payload.pop(k, None)

Error Handling

import httpx
try:
    resp = client.complete("bad-model", messages)
except httpx.HTTPStatusError as exc:
    print(exc.response.status_code)   # 400
    print(exc.response.json())        # {"message": "unknown model \"bad-model\""}

Module-Level Shortcuts

from cortex_rest import complete, stream

# One-shot (returns string)
print(complete("What is 2+2?"))

# Streaming (yields chunks)
for chunk in stream("Explain vector search in two sentences."):
    print(chunk, end="", flush=True)

LiteLLM Integration Note

When routing through LiteLLM proxy, prefix the token with pat/:

api_key: os.environ/SNOWFLAKE_PAT   # .env: SNOWFLAKE_PAT=pat/<raw_token>

For direct CortexInferenceClient, use the raw token (no prefix).


Shannon Entropy & Distribution Analysis Reference

Functions in cortex_ibis.py section 10. Use to measure category diversity per group.

Intuition

Shannon entropy quantifies how unpredictable a distribution is:

H (bits)Meaning for 4-category reviews
2.0Perfectly uniform — equal spread across billing/delivery/defect/positive
1.0–1.9Mixed — 2–3 categories dominant
0.3–1.0Concentrated — one category dominates (~70–90%)
0.0Single category — 100% of reviews in one bucket

Product insight: high-entropy products need broad support coverage; low-entropy products have a focused root cause.

category_entropy() — pure SQL via Ibis

from cortex_ibis import category_entropy

# Input: any Ibis table with a group col and a category col
# (e.g. output of add_classification())
classified = add_classification(reviews_tbl, "body",
                                ["billing issue", "delivery problem",
                                 "product defect", "positive feedback"])

entropy_tbl = category_entropy(
    classified,
    group_cols=["product_id"],
    category_col="category",
)
# → product_id | entropy | dominant_category | dominant_share
# Ordered by entropy ASC (lowest = most concentrated)

# Preview SQL before running
print(ibis.to_sql(entropy_tbl, dialect="snowflake"))

# Execute
df = entropy_tbl.execute()

normalized_entropy() — [0, 1] scale

from cortex_ibis import normalized_entropy

norm_tbl = normalized_entropy(
    classified,
    group_cols=["product_id"],
    category_col="category",
    num_categories=4,       # must match the actual number of distinct labels
)
# Adds 'normalized_entropy' column: 0.0 = single class, 1.0 = perfectly uniform

entropy_from_pandas() — scipy path

from cortex_ibis import entropy_from_pandas

# From raw rows
result = entropy_from_pandas(df, group_col="product_id", category_col="true_category")

# From pre-aggregated counts
counts_df = df.groupby(["product_id", "category"]).size().reset_index(name="n")
result = entropy_from_pandas(counts_df, "product_id", "category", count_col="n")

# Returns: product_id | entropy | normalized_entropy | dominant_category | dominant_share

Synthetic Dataset

synthetic_data.py generates 300 reviews across 8 products with controlled Dirichlet distributions:

from synthetic_data import make_reviews, distribution_summary

df = make_reviews(seed=42)                # 300 rows: id, product_id, body, true_category
summary = distribution_summary(df)       # pivot with per-product counts + true_entropy_bits
ProductProfileTrue H
P001Uniform~2.00 bits
P00290% product defect~0.47 bits
P003Bimodal billing+delivery~1.0 bits
P004All positive~0.0 bits
P005Bimodal delivery+defect~1.0 bits
P006Slight positive skew~1.9 bits
P00790% billing~0.47 bits
P008Trimodal billing/delivery/defect~1.58 bits

Miller–Madow Bias Note

Sample entropy underestimates true entropy for small n. The gap shrinks as n grows:

  • n=30 → sample H can be 0.2–0.6 bits below true H
  • n=300+ → gap < 0.05 bits typically

Use distribution_summary(df) to compare true vs sample entropy on the synthetic dataset.

Full Demo

python distribution_demo.py
# Prints rich entropy table + ASCII bar chart + exports assets/s_entropy.svg

TurboPuffer Integration Reference

Pipeline: pandas → Ibis memtable → Cortex AI enrichment → Cortex embeddings → TurboPuffer index → ANN/hybrid/filtered search.

See full example in turbopuffer_demo.py.

Setup

pip install turbopuffer
export TURBOPUFFER_API_KEY=tpuf_A1...
export TURBOPUFFER_REGION=aws-us-east-1    # match your Snowflake region
from turbopuffer import Turbopuffer
tpuf = Turbopuffer(api_key=os.environ["TURBOPUFFER_API_KEY"])

Step 1 — Enrich with Cortex (via Ibis memtable)

import ibis
from cortex_ibis import ai_sentiment, ai_classify, variant_str, variant_float

tbl = ibis.memtable(df)           # no CREATE TABLE privilege needed
enriched = tbl.mutate(
    sentiment_label=ai_sentiment(tbl.body),
    sentiment_score=cortex_sentiment(tbl.body),
    category=variant_str(ai_classify(tbl.body, CATEGORIES), "label"),
).execute()

Step 2 — Generate Embeddings

# Uses raw SQL via con.raw_sql() + CAST to VARCHAR for Python connector compatibility
sql = f"""
    SELECT id, CAST(SNOWFLAKE.CORTEX.EMBED_TEXT_768('{MODEL}', body) AS VARCHAR) AS vec_str
    FROM (VALUES {rows_sql}) AS t(id, body)
"""
result = con.raw_sql(sql)
vec_map = {row[0]: json.loads(row[1]) for row in result.fetchall()}
df["vector"] = df["id"].map(vec_map)

Step 3 — Index into TurboPuffer

ns = tpuf.namespace("cortex-ibis-reviews")
ns.write(
    upsert_rows=[{"id": ..., "vector": [...], "body": ..., "category": ..., "sentiment_label": ...}],
    distance_metric="cosine_distance",
    schema={"body": {"type": "string", "full_text_search": True}, "category": {"type": "string"}},
)

Search Patterns

# ANN (vector-only)
ns.query(rank_by=("vector", "ANN", query_vec), limit=5,
         include_attributes=["body", "category", "sentiment_label"])

# Filtered ANN
ns.query(rank_by=("vector", "ANN", query_vec),
         filters=("sentiment_label", "Eq", "negative"), limit=5)

# Hybrid (70% vector + 30% BM25)
ns.query(rank_by=("Sum", [
    ("Product", 0.7, ("vector", "ANN", query_vec)),
    ("Product", 0.3, ("body", "BM25", query_text)),
]), limit=5)

# Pure BM25 full-text
ns.query(rank_by=("body", "BM25", "refund missing package"), limit=5)

# Aggregations
ns.query(aggregate_by={"count": ("Count",)}, group_by=["category"])

# Namespace branching (copy-on-write, O(1))
branch = tpuf.namespace("cortex-ibis-reviews-branch")
branch.write(branch_from_namespace="cortex-ibis-reviews")
Updated 2026-06-17

This content is provided as is, and is not maintained on an ongoing basis. It may be out of date with current Snowflake instances