Retrieval-Augmented
Generation Mastery
From Basic RAG to GraphRAG, Agentic Systems, and Production Deployments — every technique explained with architecture diagrams and real code examples.
Basic RAG Architecture
RAG grounds an LLM's responses in retrieved, real-world documents — eliminating hallucinations and making knowledge updatable without retraining.
from openai import OpenAI
import chromadb
client = OpenAI()
chroma = chromadb.Client()
collection = chroma.get_or_create_collection("docs")
def embed(text: str) -> list[float]:
res = client.embeddings.create(model="text-embedding-3-small", input=text)
return res.data[0].embedding
def index_documents(docs: list[str]):
"""Embed and store documents in the vector store."""
embeddings = [embed(doc) for doc in docs]
collection.add(
documents=docs,
embeddings=embeddings,
ids=[f"doc_{i}" for i in range(len(docs))]
)
def rag_query(question: str, k: int = 3) -> str:
# 1️⃣ Embed the user query
q_emb = embed(question)
# 2️⃣ Retrieve top-k similar chunks
results = collection.query(query_embeddings=[q_emb], n_results=k)
context = "\n\n".join(results["documents"][0])
# 3️⃣ Augment prompt with retrieved context
prompt = f"""Use ONLY the context below to answer the question.
Context:
{context}
Question: {question}"""
# 4️⃣ Generate answer
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Usage
index_documents(["RAG grounds LLMs in real documents...", "Retrieval happens via vector search..."])
answer = rag_query("What is RAG?")
print(answer)
Why RAG beats fine-tuning for most use-cases: RAG lets you update knowledge in seconds (re-index), costs ~$0 per update, and provides citations. Fine-tuning bakes knowledge into weights — expensive, slow, opaque.
Chunking Strategies
How you split documents is as important as the retrieval model. Wrong chunk size kills recall and precision.
Fixed-Size
Split every N tokens. Simple but breaks sentences mid-thought.
FastSentence
NLTK / spaCy sentence boundaries. Clean, natural splits.
BalancedSemantic
Embed sentences, split on cosine distance jumps. Slow but accurate.
Best QualityRAPTOR
Recursive summarization tree. Handles multi-scale questions.
Multi-LevelDocument-Aware
Markdown headers, HTML tags, PDF structure as boundaries.
Structure-SafeSliding Window
Overlapping chunks (e.g. 512 tokens, 64 overlap) preserve context.
Context-SafeParent-Child
Store small chunks for search, return large parent for generation.
Precision+Contextfrom langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
# Semantic chunker splits where embedding similarity drops sharply
splitter = SemanticChunker(
embeddings=OpenAIEmbeddings(model="text-embedding-3-small"),
breakpoint_threshold_type="percentile", # or "standard_deviation"
breakpoint_threshold_amount=85, # split at 85th percentile jumps
)
docs = splitter.create_documents([long_text])
print(f"Created {len(docs)} semantically coherent chunks")
# ── Parent-Child pattern ──
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=InMemoryStore(),
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Search returns small child chunks → fetches big parent for LLM context
| Strategy | Chunk Size Sweet Spot | Best For | Main Risk |
|---|---|---|---|
| Fixed-Size | 512–1024 tokens | Homogeneous text, high throughput | Splits mid-sentence |
| Semantic | Variable | Technical docs, research papers | Slow & costly to compute |
| Parent-Child | Child 200–400 / Parent 1500–2000 | Balancing precision + context | Storage overhead |
| RAPTOR | Multi-level summaries | Long-form, multi-section docs | Complexity, latency |
| Sliding Window | 512 + 64 overlap | Dense factual text (legal, medical) | Duplicate info in retrieval |
Embedding Models
Embeddings are the heart of retrieval quality. Different models excel at different domains and languages.
🏆 OpenAI text-embedding-3-large
Best all-around for English. 3072 dims, supports Matryoshka truncation. ~$0.13/M tokens.
🚀 Cohere embed-v3
Multilingual (100+ langs), 1024 dims, supports int8 quantization. Input-type aware (query vs doc).
🔓 nomic-embed-text-v1.5 (Open)
8192 token context (vs 512 for most), Apache 2.0, runs locally. Excellent for long docs.
⚡ ColBERT / ColPali
Late-interaction: embed every token, not just [CLS]. MaxSim retrieval — dramatically better recall.
# MRL: truncate embedding dimensions without retraining
# text-embedding-3-* supports this natively
from openai import OpenAI
client = OpenAI()
def embed_mrl(text: str, dims: int = 256) -> list[float]:
"""
Smaller dims = cheaper storage + faster search, small accuracy drop.
dims=256 → 6× smaller, ~2% accuracy loss
dims=1536 → balanced (default small model)
dims=3072 → maximum quality (large model)
"""
res = client.embeddings.create(
model="text-embedding-3-large",
input=text,
dimensions=dims # ← MRL truncation
)
return res.data[0].embedding
# Hybrid retrieval: combine dense + sparse (BM25)
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
bm25 = BM25Retriever.from_documents(docs)
bm25.k = 4
dense = vectorstore.as_retriever(search_kwargs={"k": 4})
# 60% dense, 40% BM25 — best of both worlds
hybrid = EnsembleRetriever(
retrievers=[bm25, dense],
weights=[0.4, 0.6]
)
Vector Databases
Choosing the right vector store depends on scale, latency requirements, filtering needs, and whether you need managed hosting.
| Database | Best For | Filtering | Scale | Managed | Open Source |
|---|---|---|---|---|---|
| Qdrant | Production RAG, Rust-based speed | ✅ Payload filters | 1B+ vectors | ✓ Cloud | ✓ Apache 2.0 |
| Pinecone | Serverless, zero-ops | ✅ Metadata | Unlimited | ✓ Fully managed | ✗ |
| pgvector | Existing PostgreSQL users | ✅ Full SQL | ~10M vectors | ✓ Supabase/RDS | ✓ PostgreSQL ext |
| Weaviate | Multi-modal, hybrid search built-in | ✅ GraphQL | 100M+ vectors | ✓ Cloud | ✓ BSD 3 |
| Chroma | Local dev, prototyping | ✅ Where filters | ~1M vectors | ✗ | ✓ Apache 2.0 |
| Milvus | Billion-scale enterprise | ✅ Scalar + vector | 10B+ vectors | ✓ Zilliz Cloud | ✓ Apache 2.0 |
from qdrant_client import QdrantClient, models
client = QdrantClient(url="http://localhost:6333")
# Create collection with HNSW index
client.create_collection(
collection_name="docs",
vectors_config=models.VectorParams(size=1536, distance=models.Distance.COSINE),
hnsw_config=models.HnswConfigDiff(m=16, ef_construct=100) # tune for recall/speed
)
# Upsert with metadata payload
client.upsert(
collection_name="docs",
points=[
models.PointStruct(
id=1,
vector=embed("Climate change impacts 2024"),
payload={"source": "ipcc.pdf", "year": 2024, "category": "climate"}
)
]
)
# Filtered semantic search — only docs from 2024+, climate category
results = client.search(
collection_name="docs",
query_vector=embed("global warming effects"),
query_filter=models.Filter(
must=[
models.FieldCondition(key="year", range=models.Range(gte=2024)),
models.FieldCondition(key="category", match=models.MatchValue(value="climate"))
]
),
limit=5
)
Advanced RAG
Each technique solves a specific failure mode in Naive RAG. Understand the problem first, then apply the fix.
HyDE — Hypothetical Document Embeddings Medium
Problem: User queries are short & ambiguous. Documents are long & specific. Their embeddings live in different vector spaces.
from openai import OpenAI
client = OpenAI()
def hyde_retrieve(query: str, vectorstore, k: int = 5):
# Step 1: Generate a hypothetical document
hyp_doc = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "Write a factual paragraph that would directly answer this question."
}, {
"role": "user",
"content": query
}]
).choices[0].message.content
# Step 2: Embed the hypothesis (lives in doc-space, not query-space)
hyp_embedding = embed(hyp_doc)
# Step 3: Search with the hypothesis embedding
results = vectorstore.similarity_search_by_vector(hyp_embedding, k=k)
return results
# HyDE improves recall by ~15-20% on knowledge-intensive tasks
RAG-Fusion — Multi-Query + RRF Medium
Problem: A single query misses relevant docs phrased differently. Solution: generate N query variations and fuse their ranked results.
from langchain.load import dumps, loads
def generate_query_variants(query: str, n: int = 4) -> list[str]:
prompt = f"""Generate {n} different ways to ask this question.
Output only the questions, one per line.
Original: {query}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content.strip().split("\n")
def reciprocal_rank_fusion(results: list[list], k: int = 60) -> list:
"""RRF score = Σ 1/(k + rank). Promotes docs appearing high across many queries."""
fused_scores: dict = {}
for docs in results:
for rank, doc in enumerate(docs):
doc_str = dumps(doc)
fused_scores[doc_str] = fused_scores.get(doc_str, 0) + 1 / (k + rank + 1)
return [
loads(doc) for doc, _ in
sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
]
def rag_fusion_query(query: str, vectorstore, k: int = 5):
variants = generate_query_variants(query)
all_results = [vectorstore.similarity_search(q, k=k) for q in variants]
return reciprocal_rank_fusion(all_results)[:k]
Self-RAG — Adaptive Retrieval with Reflection Hard
The LLM decides when to retrieve, critiques its own output, and generates special reflection tokens to self-assess relevance and support.
[Retrieve] token
[ISREL] token
[ISSUP] token
[ISUSE] token
# Full Self-RAG requires a fine-tuned model (selfrag/selfrag_llama2_7b on HuggingFace)
# This shows the conceptual pattern using prompting
SELF_RAG_PROMPT = """You are a Self-RAG assistant. For each question:
1. Decide if retrieval is needed → output [Retrieve] or [No Retrieve]
2. If retrieved docs are relevant → output [ISREL: yes/no]
3. Generate answer grounded in docs → output [ISSUP: fully/partially/no]
4. Rate your answer → output [ISUSE: 5/4/3/2/1]"""
def self_rag(query: str, vectorstore) -> dict:
# First pass: decide if retrieval is needed
decision = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SELF_RAG_PROMPT},
{"role": "user", "content": f"Question: {query}\nShould I retrieve? Output [Retrieve] or [No Retrieve]."}
]
).choices[0].message.content
if "[Retrieve]" in decision:
docs = vectorstore.similarity_search(query, k=3)
context = "\n".join(d.page_content for d in docs)
else:
context = "" # Answer from parametric knowledge
# Generate with self-critique tokens
answer = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"Context: {context}\nQ: {query}\nAnswer + critique tokens:"}]
).choices[0].message.content
return {"answer": answer, "retrieved": bool(context), "context": context}
CRAG — Corrective RAG Medium
When retrieved docs score low relevance, CRAG automatically falls back to web search and re-ranks before generating.
from langgraph.graph import StateGraph, END
from langchain_community.tools.tavily_search import TavilySearchResults
web_search = TavilySearchResults(max_results=3)
def grade_documents(state):
"""Score retrieval relevance; flag for web search if low."""
docs, question = state["documents"], state["question"]
filtered, web_needed = [], False
for doc in docs:
grade = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content":
f"Is this document relevant to '{question}'?\nDoc: {doc.page_content[:500]}\nAnswer yes/no"}]
).choices[0].message.content.lower()
if "yes" in grade:
filtered.append(doc)
else:
web_needed = True
return {"documents": filtered, "web_search": web_needed, "question": question}
def web_search_node(state):
results = web_search.invoke(state["question"])
new_docs = [Document(page_content=r["content"]) for r in results]
return {"documents": state["documents"] + new_docs}
# Build CRAG graph
workflow = StateGraph(dict)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("web_search", web_search_node)
workflow.add_node("generate", generate_node)
workflow.add_conditional_edges("grade_documents", lambda s: "web_search" if s["web_search"] else "generate")
GraphRAG & Knowledge Graph RAG
Standard RAG retrieves isolated text chunks. GraphRAG builds a knowledge graph first, enabling multi-hop reasoning across connected entities.
🏢 Microsoft GraphRAG
Clusters entities using the Leiden algorithm, generates community summaries at each level, enables global & local search modes.
🦙 LlamaIndex KG Index
Auto-extracts (subject, predicate, object) triples using an LLM, stores in NetworkX/Neo4j, retrieves via keyword or embedding search on graph.
from llama_index.core import KnowledgeGraphIndex, SimpleDirectoryReader
from llama_index.core.graph_stores import SimpleGraphStore
from llama_index.llms.openai import OpenAI as LlamaOpenAI
# Load documents
documents = SimpleDirectoryReader("./data").load_data()
# Build KG Index — LLM extracts SPO triples automatically
kg_index = KnowledgeGraphIndex.from_documents(
documents,
llm=LlamaOpenAI(model="gpt-4o"),
max_triplets_per_chunk=10,
include_embeddings=True, # hybrid: graph + vector
graph_store=SimpleGraphStore(),
)
# Query with graph traversal
query_engine = kg_index.as_query_engine(
include_text=True,
response_mode="tree_summarize",
embedding_mode="hybrid", # keyword + vector on graph
similarity_top_k=5,
)
# Multi-hop: "What did the CEO of the company that built Claude found before Anthropic?"
response = query_engine.query("What companies did Dario Amodei found?")
print(response)
# Visualize the graph
kg_index.get_networkx_graph() # → export to Gephi / pyvis
When to use GraphRAG: Multi-hop questions ("Who is the CEO of the company that built X?"), relationship queries, large document sets with cross-document dependencies (e.g. medical literature, legal case networks).
Agentic RAG
Agentic RAG combines retrieval with autonomous tool use, planning, and multi-step reasoning. The LLM acts as an agent that decides what to retrieve and when.
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
@tool
def search_docs(query: str) -> str:
"""Search internal knowledge base for relevant documents."""
results = vectorstore.similarity_search(query, k=3)
return "\n\n".join(r.page_content for r in results)
@tool
def web_search(query: str) -> str:
"""Search the web for current information not in the knowledge base."""
results = tavily.invoke(query)
return str(results[:2])
@tool
def execute_python(code: str) -> str:
"""Execute Python code for calculations and data analysis."""
import subprocess
result = subprocess.run(["python3", "-c", code], capture_output=True, text=True, timeout=10)
return result.stdout or result.stderr
# Build ReAct agent — it autonomously decides which tools to call and when
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_react_agent(
model=llm,
tools=[search_docs, web_search, execute_python],
prompt="You are a research assistant. Use tools to answer questions accurately."
)
# The agent will:
# 1. Think: "I need to search docs first"
# 2. Act: call search_docs
# 3. Observe: get results
# 4. Think: "I need more recent data"
# 5. Act: call web_search
# 6. Observe: get results
# 7. Generate: synthesize final answer
result = agent.invoke({"messages": [("user", "What is the latest RAG benchmark score for GPT-4o?")]})
Agentic RAG risks: Infinite loops (cap max iterations at 10), cost blowup (each tool call = tokens), prompt injection via retrieved docs. Always sandbox code execution and validate tool outputs.
Multimodal RAG
Retrieve and reason over images, charts, PDFs, audio, and video — not just text.
🖼️ ColPali (2024)
Embeds entire PDF page screenshots with PaliGemma. No OCR needed. Best for charts, diagrams, scanned docs. Top MTEB visual score.
🤖 Vision LLM RAG
Describe images/charts with GPT-4o Vision, store descriptions as text chunks, retrieve and feed original image to LLM for generation.
🎧 Audio RAG
Whisper transcription → chunk → embed → retrieve. Add speaker diarization (pyannote) for meeting/podcast Q&A.
import base64
from pathlib import Path
def describe_image(image_path: str) -> str:
"""Use GPT-4o Vision to generate a rich text description of an image."""
img_data = base64.b64encode(Path(image_path).read_bytes()).decode()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image in detail, including all text, data, charts, and visual elements."},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_data}"}}
]
}]
)
return response.choices[0].message.content
# Index: store both description (for retrieval) and image path (for generation)
def index_images(image_paths: list[str]):
for path in image_paths:
description = describe_image(path)
vectorstore.add_texts(
texts=[description],
metadatas=[{"image_path": path, "type": "image"}]
)
# Retrieve: get description, return original image to the LLM
def multimodal_rag(query: str) -> str:
results = vectorstore.similarity_search(query, k=2, filter={"type": "image"})
images_b64 = []
for doc in results:
img_data = base64.b64encode(Path(doc.metadata["image_path"]).read_bytes()).decode()
images_b64.append(img_data)
messages = [{"role": "user", "content": [
{"type": "text", "text": f"Answer using these images: {query}"},
*[{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img}"}} for img in images_b64]
]}]
return client.chat.completions.create(model="gpt-4o", messages=messages).choices[0].message.content
RAG Frameworks
Don't reinvent the wheel. Pick a framework based on your use-case, then customize.
| Framework | Best For | Learning Curve | Production Ready | Unique Feature |
|---|---|---|---|---|
| LlamaIndex | Data-intensive RAG, structured data, agents | Medium | ✓ Yes | Property Graph Index, 100+ data loaders |
| LangChain | Chains, agents, broad ecosystem | Medium | ✓ Yes | LCEL, LangSmith tracing, 600+ integrations |
| Haystack | Production NLP pipelines, search | Low | ✓ Yes | Pipeline YAML config, Haystack Hub |
| DSPy | Optimising prompts & RAG pipelines | High | ✓ Yes | Automatic prompt optimization (MIPRO, BootstrapFewShot) |
| Ragas | Evaluating RAG quality (not building) | Low | ✓ Yes | Automated faithfulness/relevancy/context metrics |
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.qdrant import QdrantVectorStore
# Ingestion pipeline: transform docs → nodes → embeddings → store
pipeline = IngestionPipeline(
transformations=[
SemanticSplitterNodeParser(
embed_model=OpenAIEmbedding(model="text-embedding-3-small"),
breakpoint_percentile_threshold=95
),
OpenAIEmbedding(model="text-embedding-3-small"), # embed nodes
],
vector_store=QdrantVectorStore(client=qdrant_client, collection_name="docs")
)
nodes = pipeline.run(documents=SimpleDirectoryReader("./docs").load_data())
# Query with reranking (cross-encoder re-scores top-20 → return top-5)
index = VectorStoreIndex(nodes, storage_context=storage_ctx)
reranker = SentenceTransformerRerank(model="cross-encoder/ms-marco-MiniLM-L-6-v2", top_n=5)
query_engine = index.as_query_engine(
similarity_top_k=20, # retrieve 20
node_postprocessors=[reranker], # rerank → 5
response_mode="compact"
)
response = query_engine.query("Explain RAG-Fusion")
print(response.source_nodes[0].score) # rerank score
Evaluation Frameworks
You can't improve what you don't measure. These four metrics cover the full RAG quality surface.
📐 Faithfulness
Is every claim in the answer supported by the retrieved context? Catches hallucinations introduced by the generator.
🎯 Answer Relevance
Does the answer actually address what was asked? Penalises verbose or off-topic responses.
🔍 Context Precision
Of everything retrieved, how much was actually needed? High noise = low precision.
📦 Context Recall
Did the retrieved context contain all the information needed to answer correctly?
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall
from datasets import Dataset
# Build evaluation dataset
eval_data = Dataset.from_dict({
"question": ["What is RAG?", "Who invented the Transformer?"],
"answer": ["RAG augments LLMs with external retrieval...", "The Transformer was introduced by Vaswani et al..."],
"contexts": [
["RAG stands for Retrieval-Augmented Generation..."],
["Attention Is All You Need, Vaswani et al., 2017..."]
],
"ground_truth": ["RAG uses retrieval to augment generation.", "Vaswani et al. invented the Transformer in 2017."]
})
results = evaluate(
eval_data,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall]
)
print(results.to_pandas()[["faithfulness","answer_relevancy","context_precision","context_recall"]])
# faithfulness answer_relevancy context_precision context_recall
# 0.92 0.87 0.91 0.78
from deepeval import assert_test
from deepeval.metrics import FaithfulnessMetric, HallucinationMetric, AnswerRelevancyMetric
from deepeval.test_case import LLMTestCase
# DeepEval integrates with pytest — RAG quality as CI/CD gates
def test_rag_faithfulness():
test_case = LLMTestCase(
input="What is CRAG?",
actual_output="CRAG uses a relevance grader to decide when to use web search as fallback.",
retrieval_context=["CRAG corrects retrieval by scoring doc relevance..."],
)
faithfulness = FaithfulnessMetric(threshold=0.8, model="gpt-4o", include_reason=True)
hallucination = HallucinationMetric(threshold=0.2)
assert_test(test_case, [faithfulness, hallucination])
# Fails CI if faithfulness < 0.8 or hallucination > 0.2
Production Considerations
⚡ Latency Optimization
- → Embedding cache — Redis TTL on frequent queries (50–80% cache hit typical)
- → HNSW tuning — ef=64 balances recall (98%) vs latency (<5ms)
- → Async retrieval — asyncio.gather for parallel chunk fetches
- → Streaming — stream LLM tokens, don't wait for full response
- → Quantized embeddings — int8 Cohere v3 = 4× smaller, <1% quality loss
💰 Cost Control
- → MRL truncation — 256 dims = 12× cheaper than 3072, ~2% accuracy loss
- → Small embed model — nomic-embed (free, local) for non-critical paths
- → Context window discipline — top-3 chunks, not top-20
- → Generator routing — GPT-4o-mini for simple Qs, GPT-4o for complex
- → Batch indexing — $0.00002/1K tokens vs $0.00013 at inference
🔐 Security
- → Prompt injection — retrieved docs can inject instructions; use guardrails
- → Access control — filter vectorstore by user permissions before retrieval
- → PII in docs — scan chunks before indexing (Presidio, AWS Comprehend)
- → Source attribution — always return source URLs/page numbers for auditing
📊 Observability
- → LangSmith — trace every retrieval+generation call end-to-end
- → Arize Phoenix — open-source LLM observability, embedding drift
- → Key metrics — latency P50/P95/P99, token cost/query, retrieval hit rate
- → Embedding drift — alert when query dist drifts from index dist
import redis, hashlib, asyncio
from openai import AsyncOpenAI
aclient = AsyncOpenAI()
cache = redis.Redis(host="localhost", port=6379)
def cache_key(text: str) -> str:
return f"emb:{hashlib.sha256(text.encode()).hexdigest()}"
async def embed_cached(text: str) -> list[float]:
"""Embedding with Redis cache — avoids re-embedding identical queries."""
key = cache_key(text)
if cached := cache.get(key):
return eval(cached) # deserialize
emb = (await aclient.embeddings.create(model="text-embedding-3-small", input=text)).data[0].embedding
cache.setex(key, 3600, str(emb)) # 1hr TTL
return emb
async def rag_stream(question: str, vectorstore):
"""Full async RAG with streaming generator output."""
q_emb = await embed_cached(question)
docs = await asyncio.to_thread(vectorstore.similarity_search_by_vector, q_emb, k=3)
context = "\n\n".join(d.page_content for d in docs)
prompt = f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
stream = await aclient.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True # ← stream tokens as they arrive
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content # stream to client
Claude for RAG Applications
Claude models are a top choice for the generation step in RAG, especially for tasks requiring careful reasoning, long-context handling, and instruction following.
| Model | Released | Context | RAG Sweet Spot | Cost (input/1M) |
|---|---|---|---|---|
| Claude 1.x | Mar 2023 | 9K | Historical baseline | N/A (retired) |
| Claude 2.1 | Nov 2023 | 200K | Long-doc RAG | $8 |
| Claude 3 Haiku | Mar 2024 | 200K | High-throughput, cost-sensitive | $0.25 |
| Claude 3 Sonnet | Mar 2024 | 200K | Balanced RAG workloads | $3 |
| Claude 3 Opus | Mar 2024 | 200K | Complex multi-hop reasoning | $15 |
| Claude 3.5 Sonnet | Jun 2024 | 200K | Best price/performance for RAG | $3 |
| Claude 3.5 Haiku | Nov 2024 | 200K | Fast, cheap agentic RAG | $0.80 |
| Claude 3.7 Sonnet | Feb 2025 | 200K | Extended thinking, multi-step RAG | $3 |
| Claude 4 Series | 2025 | 200K+ | Most capable generation step | Varies |
import anthropic
client = anthropic.Anthropic()
def claude_rag(question: str, context_docs: list[str]) -> str:
"""
Claude 3.7 Sonnet with extended thinking — ideal for complex
multi-step RAG where reasoning quality matters most.
"""
context = "\n\n---\n\n".join(context_docs)
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=8000,
thinking={"type": "enabled", "budget_tokens": 5000}, # extended thinking
messages=[{
"role": "user",
"content": f"""You are a research assistant. Use ONLY the provided context.
{context}
{question}
Answer with citations. If the context is insufficient, say so."""
}]
)
# Extract thinking + answer separately
thinking_text = next((b.thinking for b in response.content if b.type == "thinking"), "")
answer = next(b.text for b in response.content if b.type == "text")
return answer
# Also works with prompt caching — reduces costs up to 90% on repeated context
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{
"role": "user",
"content": [{
"type": "text",
"text": large_context,
"cache_control": {"type": "ephemeral"} # cache the context prefix
}, {
"type": "text",
"text": question
}]
}]
)
On "unreleased" Anthropic models: Anthropic does not publicly disclose models that are in development or trained but not released. Any specific claims about named unreleased models circulating online are speculation, not official announcements. The safest source is anthropic.com/news and the official API model list.
Which RAG Technique Should I Use?
Query is vague / short
Use HyDE — generate a hypothetical answer and use that embedding for retrieval.
Missing relevant docs
Use RAG-Fusion — multiple query variants catch documents phrased differently.
Fresh info needed
Use CRAG — auto-fallback to web search when internal docs score low relevance.
Multi-hop questions
Use GraphRAG — traverse entity relationships across documents.
Complex, multi-step tasks
Use Agentic RAG — LLM plans and uses tools autonomously.
Charts / PDFs / images
Use ColPali or Vision RAG — page-level visual embeddings or describe-then-retrieve.
The AWS Mental Model
AWS is a collection of building blocks. The key is knowing which block solves which problem — don't memorise all 200+ services, learn the 15 that cover 95% of real workloads.
| Category | Service | One-liner | When NOT to use |
|---|---|---|---|
| Compute | EC2 | Virtual machine — full OS control | Short-lived tasks (<15 min) → use Lambda |
| Compute | Lambda | Function-as-a-service, event-driven | Long-running processes (>15 min) → use ECS |
| Containers | ECS / EKS | Run Docker containers at scale | Simple apps — over-engineering |
| Storage | S3 | Unlimited object storage | Frequent random reads/writes → use EFS/EBS |
| Database | RDS / Aurora | Managed PostgreSQL / MySQL | Massive scale >100k writes/s → DynamoDB |
| Database | DynamoDB | Managed NoSQL, millisecond latency | Complex joins / ACID transactions → RDS |
| Cache | ElastiCache | Managed Redis / Memcached | Persistent data → it's a cache, not a DB |
| Networking | CloudFront | CDN + WAF + edge caching | Internal-only APIs with no public traffic |
| Messaging | SQS | Durable message queue, decouples services | Real-time fanout → use SNS or EventBridge |
| Security | IAM | Identity, roles, permissions for everything | Never skip — always use least-privilege |
| Security | Secrets Manager | Store API keys, DB passwords securely | Public config values → use SSM Parameter Store |
| Observability | CloudWatch | Logs, metrics, alarms, dashboards | Complex APM needs → pair with X-Ray or Datadog |
S3 — Simple Storage Service
Virtually unlimited object storage. Durability: 99.999999999% (11 nines). The backbone of almost every AWS architecture.
🗂️ Storage Classes
- Standard — hot data, frequent access, $0.023/GB
- Standard-IA — infrequent access, 40% cheaper
- Intelligent-Tiering — auto-moves between tiers
- Glacier — archival, retrieval in mins/hours, $0.004/GB
- Glacier Deep Archive — $0.00099/GB, 12hr retrieval
🔒 Access Control
- Bucket Policy — JSON, controls access at bucket level
- IAM Policy — controls which identities can access
- Pre-signed URLs — temporary access without credentials
- Block Public Access — always enable on account level
- S3 Object Lock — WORM compliance (financial, medical)
⚡ Power Features
- S3 Select — query CSV/JSON inside objects (no download)
- Event Notifications — trigger Lambda on upload
- Multipart Upload — required for objects >100MB
- Transfer Acceleration — CloudFront edge → 50% faster uploads
- Replication (CRR/SRR) — cross-region / same-region
import boto3
from botocore.exceptions import ClientError
s3 = boto3.client("s3", region_name="us-east-1")
# ── Upload a file ──
s3.upload_file(
Filename="report.pdf",
Bucket="my-bucket",
Key="reports/2025/report.pdf",
ExtraArgs={
"ContentType": "application/pdf",
"ServerSideEncryption": "AES256", # always encrypt at rest
"StorageClass": "STANDARD_IA", # cheaper for infrequent reads
}
)
# ── Generate pre-signed URL (expires in 1 hour) ──
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": "my-bucket", "Key": "reports/2025/report.pdf"},
ExpiresIn=3600
)
print(url) # share with client — no AWS credentials needed
# ── Multipart upload for large files ──
from boto3.s3.transfer import TransferConfig
config = TransferConfig(multipart_threshold=25 * 1024 * 1024) # 25MB threshold
s3.upload_file("bigfile.zip", "my-bucket", "uploads/bigfile.zip", Config=config)
# ── Set lifecycle rule — move to Glacier after 90 days, delete after 365 ──
s3.put_bucket_lifecycle_configuration(
Bucket="my-bucket",
LifecycleConfiguration={"Rules": [{
"ID": "archive-old-reports",
"Status": "Enabled",
"Filter": {"Prefix": "reports/"},
"Transitions": [{"Days": 90, "StorageClass": "GLACIER"}],
"Expiration": {"Days": 365}
}]}
)
# ── Trigger Lambda on new object ──
# (done in S3 console → Event notifications → Lambda function)
# Lambda handler receives: event["Records"][0]["s3"]["bucket"]["name"] + key
S3 Cost Tips: Enable S3 Intelligent-Tiering for any bucket where access patterns are unknown — it costs $0.0025/1K objects/month but can save 40-90% on storage. Always enable Block Public Access. Use server-side encryption (free). Request counts cost money — batch small operations.
EC2 — Elastic Compute Cloud
Virtual machines in the cloud. You choose the OS, CPU, RAM, storage. The most flexible compute option — but also the most to manage.
| Family | Type | Use Case | Example | vCPU / RAM |
|---|---|---|---|---|
| General | t4g, m7g | Web servers, small DBs, dev/staging | t3.micro | 2 / 1 GB |
| Compute | c7g, c6i | High-CPU: encoding, ML inference, HPC | c6i.2xlarge | 8 / 16 GB |
| Memory | r7g, x2gd | In-memory DBs, large caches, SAP | r6g.2xlarge | 8 / 64 GB |
| GPU | p4d, g5 | ML training, video rendering, CUDA | g5.xlarge | 4 / 16 GB + A10G |
| Storage | i4i, d3 | High I/O, data warehousing, Hadoop | i4i.xlarge | 4 / 32 GB + NVMe |
💰 Pricing Models
- On-Demand — pay per second, no commitment. Most expensive.
- Reserved (1-3yr) — up to 72% off. Predictable workloads.
- Spot — up to 90% off. Can be interrupted. Batch/ML workloads.
- Savings Plans — flexible, 66% off. Cross instance family.
💾 Storage (EBS)
- gp3 — default, 3000 IOPS baseline. $0.08/GB/month.
- io2 Block Express — up to 256K IOPS. Databases.
- st1 — throughput-optimised HDD. Log processing.
- sc1 — cold HDD. Cheapest, low access frequency.
🛡️ Auto Scaling
- Launch Template — defines the AMI, instance type, SG, IAM role
- ASG — min/max/desired count, scaling policies
- Target Tracking — keep CPU at 70%, auto-adds instances
- Warm Pools — pre-initialized instances, sub-30s scale-out
# Launch an EC2 instance (al2023, t3.micro, us-east-1)
aws ec2 run-instances \
--image-id ami-0c02fb55956c7d316 \
--instance-type t3.micro \
--key-name my-keypair \
--security-group-ids sg-xxxxxxxxxx \
--subnet-id subnet-xxxxxxxxxx \
--iam-instance-profile Name=MyAppRole \
--user-data '#!/bin/bash
yum update -y
yum install -y docker
systemctl start docker
systemctl enable docker
docker pull my-app:latest
docker run -d -p 80:8000 my-app:latest' \
--tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=my-app}]' \
--count 1
# Connect via SSM Session Manager (no SSH key needed — more secure)
aws ssm start-session --target i-xxxxxxxxxxxxxxxxx
# Create a snapshot of EBS volume
aws ec2 create-snapshot \
--volume-id vol-xxxxxxxxxx \
--description "Before deployment snapshot $(date +%Y-%m-%d)"
# Allocate and associate Elastic IP
EIP=$(aws ec2 allocate-address --domain vpc --query AllocationId --output text)
aws ec2 associate-address --instance-id i-xxxxxxxxxx --allocation-id $EIP
Lambda — Serverless Functions
Run code without managing servers. You pay only for the milliseconds your function actually runs. Scales to zero, scales to millions.
import json, boto3, os
# ── Pattern 1: API Gateway trigger (REST API) ──
def handler_api(event, context):
body = json.loads(event.get("body", "{}"))
name = body.get("name", "World")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"message": f"Hello, {name}!"})
}
# ── Pattern 2: S3 trigger (process uploaded file) ──
def handler_s3(event, context):
s3 = boto3.client("s3")
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj["Body"].read().decode("utf-8")
print(f"Processing {key}: {len(content)} bytes")
# ... process and write result back to S3
# ── Pattern 3: SQS trigger (process messages) ──
def handler_sqs(event, context):
for record in event["Records"]:
message = json.loads(record["body"])
process_job(message)
# SQS automatically deletes messages on success
# ── Powertools for best practices ──
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit
logger = Logger(service="my-service")
tracer = Tracer()
metrics = Metrics(namespace="MyApp")
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
logger.info("Processing request", extra={"event": event})
# structured JSON logs + X-Ray traces + CloudWatch metrics — all wired up
Cold starts: First invocation of a Lambda container takes 200ms–2s. Fix with Provisioned Concurrency (keeps containers warm, costs ~$0.015/hr per unit) or use Lambda SnapStart for JVM. For Python/Node cold starts are <200ms — usually acceptable.
Databases — RDS, Aurora, DynamoDB
🐘 RDS / Aurora PostgreSQL
Managed relational DB. Aurora is 5× faster than standard RDS and automatically replicates across 3 AZs with 6 copies of data.
Aurora Serverless v2 scales from 0.5 to 128 ACUs in seconds — perfect for variable workloads. Minimum cost: ~$43/month.
⚡ DynamoDB
Key-value + document NoSQL. Single-digit millisecond at any scale. No schema to manage. Global tables for multi-region active-active.
Design access patterns first — DynamoDB's single-table design requires knowing queries upfront. Wrong key design = full table scans = $$$.
import boto3, psycopg2
# ── RDS PostgreSQL via psycopg2 ──
# Get connection string from Secrets Manager (never hardcode)
def get_db_connection():
sm = boto3.client("secretsmanager")
secret = sm.get_secret_value(SecretId="prod/postgres/main")
creds = json.loads(secret["SecretString"])
return psycopg2.connect(
host=creds["host"], port=5432,
database=creds["dbname"],
user=creds["username"], password=creds["password"],
sslmode="require" # always require SSL on RDS
)
# ── DynamoDB single-table design ──
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("app-table")
# Write an item
table.put_item(Item={
"PK": "USER#usr_123", # partition key
"SK": "PROFILE#usr_123", # sort key
"name": "Alice",
"email": "[email protected]",
"ttl": int(time.time()) + 86400 * 30, # auto-expire in 30 days
})
# Query all items for a user (one partition key, many sort keys)
response = table.query(
KeyConditionExpression=Key("PK").eq("USER#usr_123") & Key("SK").begins_with("ORDER#"),
ScanIndexForward=False, # newest first
Limit=20
)
# Conditional write (optimistic locking)
try:
table.update_item(
Key={"PK": "PRODUCT#p1", "SK": "PRODUCT#p1"},
UpdateExpression="SET stock = stock - :qty",
ConditionExpression="stock >= :qty",
ExpressionAttributeValues={":qty": 5}
)
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
print("Out of stock!")
# DynamoDB Streams → trigger Lambda on every write
# (wired in AWS console / CDK / Terraform)
| Factor | Choose RDS/Aurora | Choose DynamoDB |
|---|---|---|
| Data model | Complex relations, JOINs, foreign keys | Key-value, documents, simple access patterns |
| Scale | <100k writes/sec, TB scale | Unlimited writes, single-digit ms at any scale |
| Query flexibility | Ad-hoc SQL, complex aggregations | Known access patterns only — no ad-hoc |
| Transactions | Full ACID, multi-table transactions | Limited (up to 25 items per transaction) |
| Cost at low load | Fixed ~$43+/month minimum | On-demand: pay per request, $0 when idle |
VPC, CloudFront, Route 53, ALB
🌐 VPC Architecture Best Practices
⚡ CloudFront CDN
450+ edge locations. Serves static assets from cache, proxies dynamic requests to your origin.
# Invalidate CloudFront cache after deploy
aws cloudfront create-invalidation \
--distribution-id E1234567890ABC \
--paths "/index.html" "/assets/*"
# Check distribution status
aws cloudfront get-distribution \
--id E1234567890ABC \
--query "Distribution.Status"
import boto3
route53 = boto3.client("route53")
# Update a DNS record (e.g. after new deployment)
route53.change_resource_record_sets(
HostedZoneId="Z1234567890ABC",
ChangeBatch={"Changes": [{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": "api.example.com",
"Type": "A",
"AliasTarget": {
"HostedZoneId": "Z35SXDOTRQ7X7K", # ALB hosted zone ID
"DNSName": "my-alb-1234567890.us-east-1.elb.amazonaws.com",
"EvaluateTargetHealth": True,
}
}
}]}
)
# ALB — register targets and check health
elbv2 = boto3.client("elbv2")
# Get unhealthy targets
response = elbv2.describe_target_health(
TargetGroupArn="arn:aws:elasticloadbalancing:us-east-1:123456789:targetgroup/my-tg/abc"
)
unhealthy = [t for t in response["TargetHealthDescriptions"] if t["TargetHealth"]["State"] != "healthy"]
print(f"Unhealthy targets: {unhealthy}")
ECS, ECR, EKS
Run Docker containers without managing cluster infrastructure. ECS (simpler) or EKS (Kubernetes-compatible).
📦 ECR — Container Registry
Private Docker registry integrated with ECS/EKS. Supports image scanning, lifecycle policies, cross-region replication.
# Push image to ECR
aws ecr get-login-password --region us-east-1 \
| docker login --username AWS \
--password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com
docker build -t my-app .
docker tag my-app:latest \
123456789.dkr.ecr.us-east-1.amazonaws.com/my-app:latest
docker push \
123456789.dkr.ecr.us-east-1.amazonaws.com/my-app:latest
🚀 ECS Fargate
Serverless containers — no EC2 instances to manage. Define CPU/memory, ECS handles placement, scaling, updates.
☸️ EKS — Kubernetes
Managed Kubernetes control plane. Choose EKS when you need Helm, RBAC, custom controllers, or multi-cloud portability.
import boto3
ecs = boto3.client("ecs", region_name="us-east-1")
def deploy_new_version(cluster: str, service: str, new_image: str):
"""Rolling deploy: update task definition image → force new deployment."""
# 1. Get current task definition
svc = ecs.describe_services(cluster=cluster, services=[service])["services"][0]
current_td_arn = svc["taskDefinition"]
td = ecs.describe_task_definition(taskDefinition=current_td_arn)["taskDefinition"]
# 2. Update the image in the container definition
containers = td["containerDefinitions"]
for c in containers:
if c["name"] == "app":
c["image"] = new_image # e.g. "123456.dkr.ecr.../my-app:v2.0.1"
# 3. Register new task definition revision
new_td = ecs.register_task_definition(
family=td["family"],
containerDefinitions=containers,
taskRoleArn=td["taskRoleArn"],
executionRoleArn=td["executionRoleArn"],
networkMode=td["networkMode"],
requiresCompatibilities=td["requiresCompatibilities"],
cpu=td["cpu"], memory=td["memory"],
)
new_td_arn = new_td["taskDefinition"]["taskDefinitionArn"]
# 4. Update service with new task definition → rolling deploy begins
ecs.update_service(
cluster=cluster, service=service,
taskDefinition=new_td_arn,
forceNewDeployment=True,
deploymentConfiguration={
"maximumPercent": 200, # allow double capacity during rollout
"minimumHealthyPercent": 100, # never go below 100% healthy
}
)
print(f"Deploying {new_td_arn} to {service}")
deploy_new_version("production", "api-service", "123.dkr.ecr.us-east-1.amazonaws.com/api:v2.1")
IAM, Secrets Manager, KMS, WAF
Security is AWS's top priority — and your responsibility under the shared responsibility model. These are the non-negotiables.
🔑 IAM — Least Privilege is the Law
- → Never use root — create an admin IAM user, lock root with MFA
- → Use roles, not users — EC2/Lambda/ECS use IAM roles, not access keys
- → SCPs (Org level) — deny entire regions or dangerous actions across all accounts
- → Permission boundaries — cap max permissions a role can have
- → Access Analyzer — find over-permissioned policies and external access
- → Credential rotation — set Secrets Manager to auto-rotate every 30 days
🔐 Secrets Manager vs SSM Parameter Store
| Feature | Secrets Manager | SSM Param Store |
|---|---|---|
| Auto rotation | ✓ Built-in | ✗ Manual |
| DB integration | ✓ RDS/Redshift | ✗ No |
| Cost | $0.40/secret/mo | Free tier |
| Config values | ✗ Overkill | ✓ Great fit |
| Encryption | ✓ KMS default | ✓ SecureString |
import boto3, json
iam = boto3.client("iam")
sm = boto3.client("secretsmanager")
# ── Attach a least-privilege policy to a role ──
policy_doc = json.dumps({
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::my-bucket/uploads/*" # narrow scope!
}, {
"Effect": "Allow",
"Action": ["sqs:ReceiveMessage", "sqs:DeleteMessage"],
"Resource": "arn:aws:sqs:us-east-1:123456789:my-queue"
}]
})
policy = iam.create_policy(
PolicyName="AppLeastPrivilege",
PolicyDocument=policy_doc
)
iam.attach_role_policy(
RoleName="my-app-role",
PolicyArn=policy["Policy"]["Arn"]
)
# ── Read a secret (the right way in app code) ──
def get_secret(secret_name: str) -> dict:
response = sm.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
db_creds = get_secret("prod/postgres/main")
# {"username": "app_user", "password": "...", "host": "...", "port": 5432}
# ── Enable auto-rotation for a database secret ──
sm.rotate_secret(
SecretId="prod/postgres/main",
RotationLambdaARN="arn:aws:lambda:us-east-1:123:function:SecretsManagerRotation",
RotationRules={"AutomaticallyAfterDays": 30}
)
SQS, SNS, EventBridge
Decouple services so they don't need to talk to each other directly. If one goes down, messages queue up — nothing is lost.
import boto3, json
sqs = boto3.client("sqs", region_name="us-east-1")
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
# ── Send a message ──
sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps({"job_id": "abc123", "type": "process_image"}),
MessageAttributes={
"priority": {"StringValue": "high", "DataType": "String"}
}
)
# ── Send batch (up to 10 messages, 256KB total) ──
sqs.send_message_batch(
QueueUrl=QUEUE_URL,
Entries=[{"Id": str(i), "MessageBody": json.dumps({"item": i})} for i in range(10)]
)
# ── Consumer (long polling — cheaper than short polling) ──
def process_queue():
while True:
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # batch up to 10
WaitTimeSeconds=20, # long poll — up to 20s wait
VisibilityTimeout=300, # 5 min to process before requeue
)
for msg in response.get("Messages", []):
try:
body = json.loads(msg["Body"])
process_job(body)
# Delete on success — removes from queue
sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg["ReceiptHandle"])
except Exception as e:
print(f"Failed: {e}")
# Don't delete → message returns to queue after VisibilityTimeout
# After maxReceiveCount → moves to DLQ automatically
# ── SNS fanout: one publish → many subscribers ──
sns = boto3.client("sns")
sns.publish(
TopicArn="arn:aws:sns:us-east-1:123456789:order-events",
Subject="OrderPlaced",
Message=json.dumps({"order_id": "ord_789", "amount": 59.99}),
)
# All subscribers (SQS queues, Lambda, email) receive this simultaneously
CloudWatch, X-Ray, Cost Explorer
You can't operate what you can't see. Instrument everything from day one.
📊 CloudWatch
Collect metrics, stream logs, set alarms, build dashboards — all in one place.
import boto3
from datetime import datetime, timedelta
cw = boto3.client("cloudwatch")
# Publish custom metric
cw.put_metric_data(
Namespace="MyApp",
MetricData=[{
"MetricName": "OrdersProcessed",
"Value": 42,
"Unit": "Count",
"Dimensions": [{"Name": "Environment", "Value": "production"}]
}]
)
# Create alarm: alert if error rate > 5%
cw.put_metric_alarm(
AlarmName="HighErrorRate",
MetricName="5XXError",
Namespace="AWS/ApplicationELB",
Statistic="Sum",
Period=300, # 5 minute windows
EvaluationPeriods=2,
Threshold=50,
ComparisonOperator="GreaterThanThreshold",
AlarmActions=["arn:aws:sns:us-east-1:123:ops-alerts"]
)
# Query logs with Insights
logs = boto3.client("logs")
logs.start_query(
logGroupName="/ecs/my-app",
startTime=int((datetime.now() - timedelta(hours=1)).timestamp()),
endTime=int(datetime.now().timestamp()),
queryString="fields @timestamp, @message | filter @message like /ERROR/ | limit 100"
)
💸 Cost Management
AWS bills are surprisingly complex. These tools keep costs under control.
- → AWS Budgets — alert at 80%/100% of monthly budget
- → Cost Explorer — visualise costs by service/tag/account
- → Trusted Advisor — finds idle resources, oversized instances
- → Savings Plans — commit to $X/hr, save 66%. Auto-applied.
- → Resource tagging — tag everything: `env`, `team`, `project` for cost allocation
- → Spot for batch — ML training, CI/CD, data processing = 90% savings
NAT Gateway is the #1 surprise bill. Each AZ's NAT Gateway costs $0.045/hr + $0.045/GB. VPC Endpoints for S3/DynamoDB eliminate NAT costs for those services.
Which AWS Service Should I Use?
Store a file / image / video
Use S3. Add CloudFront CDN in front for global delivery. Use pre-signed URLs for private access.
Run a web server / API
Use ECS Fargate (Docker) for always-on, or Lambda + API Gateway for sporadic traffic.
Need a relational database
Use Aurora Serverless v2 for most apps. It auto-scales and costs $0 when paused.
Need millisecond lookups at scale
Use DynamoDB with on-demand pricing. Design your key schema around access patterns first.
Decouple two services
Use SQS for point-to-point, SNS for fanout, EventBridge for event routing rules.
Store API keys / DB passwords
Use Secrets Manager with auto-rotation. Never put credentials in code, environment variables, or S3.
Global low-latency delivery
Use CloudFront (CDN) + Route 53 (latency-based routing) + DynamoDB Global Tables.
Run ML training / GPU workload
Use EC2 Spot p4d/g5 instances (90% savings) or SageMaker for managed training pipelines.
Debug production issues
Use CloudWatch Logs Insights for log queries + X-Ray for distributed tracing across services.
Well-Architected Framework — 6 pillars: Operational Excellence · Security · Reliability · Performance Efficiency · Cost Optimization · Sustainability. Run the AWS Well-Architected Tool (free) on your architecture before going to production.
LangChain — The Mental Model
LangChain is a framework for building applications powered by LLMs. It provides primitives for connecting models, data, tools, and memory into pipelines. Everything composes.
# Core packages
pip install langchain langchain-openai langchain-anthropic langchain-community
pip install langchain-chroma langchain-qdrant langsmith
# Check versions
python -c "import langchain; print(langchain.__version__)"
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_community.llms import Ollama
# OpenAI
gpt4o = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
# Anthropic Claude
claude = ChatAnthropic(model="claude-3-7-sonnet-20250219", temperature=0)
# Local model via Ollama (no API cost)
llama = Ollama(model="llama3.1:8b")
# All three share the same interface — swap freely
response = gpt4o.invoke("What is RAG?")
print(response.content)
# Streaming
for chunk in gpt4o.stream("Explain LangChain in 3 sentences"):
print(chunk.content, end="", flush=True)
LCEL — LangChain Expression Language
LCEL uses the | operator to compose runnables into pipelines. Everything is a Runnable — it has invoke, stream, batch, and ainvoke. Chains are lazy — nothing runs until you call invoke.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
from pydantic import BaseModel
llm = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()
# ── Basic chain: prompt | model | parser ──
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Be concise."),
("human", "{question}")
])
chain = prompt | llm | parser
print(chain.invoke({"question": "What is LCEL?"}))
# ── Streaming ──
for chunk in chain.stream({"question": "Explain RAG in 3 steps"}):
print(chunk, end="", flush=True)
# ── Batch (parallel) ──
results = chain.batch([
{"question": "What is S3?"},
{"question": "What is Lambda?"},
{"question": "What is DynamoDB?"},
], config={"max_concurrency": 5})
# ── Parallel branches (run two things at once, merge) ──
parallel = RunnableParallel({
"answer": chain,
"keywords": ChatPromptTemplate.from_template("List 5 keywords for: {question}") | llm | parser,
})
out = parallel.invoke({"question": "Explain GraphRAG"})
# out = {"answer": "...", "keywords": "..."}
# ── Structured output with Pydantic ──
class Movie(BaseModel):
title: str
year: int
genre: str
structured_chain = prompt | llm.with_structured_output(Movie)
movie = structured_chain.invoke({"question": "Tell me about Inception"})
print(movie.title, movie.year) # Inception 2010
# ── Fallbacks ──
fast_chain = ChatPromptTemplate.from_template("{q}") | ChatOpenAI(model="gpt-4o-mini") | parser
strong_chain = ChatPromptTemplate.from_template("{q}") | ChatOpenAI(model="gpt-4o") | parser
chain_with_fallback = fast_chain.with_fallbacks([strong_chain])
# ── Pass-through + inject extra context ──
rag_chain = (
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
| ChatPromptTemplate.from_template("Context: {context}\n\nQ: {question}")
| llm | parser
)
Retrievers — Every Pattern You Need
A retriever accepts a string query and returns a list of Documents. LangChain has 50+ built-in retriever types. Here are the ones that matter.
🔍 VectorStore Retriever
The default — cosine similarity search against your vector store. Supports MMR (Maximum Marginal Relevance) for diversity.
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
vectorstore = Chroma(
collection_name="docs",
embedding_function=OpenAIEmbeddings()
)
# Standard similarity
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
# MMR — diverse results (less redundancy)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
# Similarity + score threshold
thresh_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.75, "k": 5}
)
🔀 MultiQueryRetriever
Auto-generates N query variants with an LLM, retrieves for each, deduplicates. Improves recall on vague queries.
from langchain.retrievers.multi_query import MultiQueryRetriever
mq_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(),
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
include_original=True # keep original query too
)
# Automatically generates:
# "What is retrieval augmented generation?"
# "How does RAG work?"
# "Explain the RAG architecture"
# → deduplicates results → returns unique docs
docs = mq_retriever.invoke("Tell me about RAG")
🏠 ParentDocument Retriever
Indexes small child chunks for high-precision search, but returns the full parent document for rich context.
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Child chunks: small = good search precision
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
# Parent chunks: large = rich LLM context
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=InMemoryStore(), # swap for Redis in production
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
retriever.add_documents(docs)
# Retrieves small child → returns big parent
🔗 Ensemble Retriever (Hybrid)
Combines dense vector search with sparse BM25 keyword search using Reciprocal Rank Fusion. Best of both worlds.
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# BM25 — keyword/lexical matching
bm25 = BM25Retriever.from_documents(docs)
bm25.k = 5
# Dense — semantic matching
dense = vectorstore.as_retriever(search_kwargs={"k": 5})
# 60% dense + 40% BM25 — RRF fusion
hybrid = EnsembleRetriever(
retrievers=[dense, bm25],
weights=[0.6, 0.4]
)
# Excels when users search by exact terms AND by meaning
result = hybrid.invoke("faiss cosine similarity ANN index")
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Step 1: Rewrite query using chat history ("it" → resolve pronoun)
contextualize_prompt = ChatPromptTemplate.from_messages([
("system", "Rewrite the user question to be standalone, using the chat history if needed."),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_prompt)
# Step 2: Answer with retrieved docs
qa_prompt = ChatPromptTemplate.from_messages([
("system", "Answer using only this context:\n\n{context}"),
MessagesPlaceholder("chat_history"),
("human", "{input}"),
])
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
# Step 3: Compose into conversational RAG chain
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
# Step 4: Add session-scoped memory
store = {} # session_id → ChatMessageHistory
def get_session_history(session_id: str):
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
conversational_rag = RunnableWithMessageHistory(
rag_chain,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer",
)
# Multi-turn conversation — "it" resolved from history
r1 = conversational_rag.invoke({"input": "What is RAG?"}, config={"configurable": {"session_id": "u1"}})
r2 = conversational_rag.invoke({"input": "How is it different from fine-tuning?"}, config={"configurable": {"session_id": "u1"}})
print(r2["answer"])
Essential Chain Patterns
📝 Summarization
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Map-Reduce: summarise each chunk → combine summaries
chain = load_summarize_chain(
llm,
chain_type="map_reduce", # or "stuff" (all at once), "refine"
verbose=True
)
splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=400)
docs = splitter.create_documents([very_long_text])
summary = chain.invoke(docs)
print(summary["output_text"])
🗃️ SQL Chain
from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain
db = SQLDatabase.from_uri("postgresql://user:pass@host/mydb")
# Natural language → SQL → execute → natural language answer
sql_chain = create_sql_query_chain(llm, db)
query = sql_chain.invoke({"question": "How many orders were placed last month?"})
# Generates: SELECT COUNT(*) FROM orders WHERE created_at > NOW() - INTERVAL '1 month'
result = db.run(query)
🔍 Self-Querying Retriever
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
# LLM writes the metadata filter automatically from natural language
metadata_field_info = [
AttributeInfo(name="source", description="The PDF file name", type="string"),
AttributeInfo(name="year", description="The publication year", type="integer"),
AttributeInfo(name="topic", description="Main topic", type="string"),
]
retriever = SelfQueryRetriever.from_llm(
llm, vectorstore, "Research papers on AI", metadata_field_info
)
# "Papers about RAG published after 2023" →
# filter: {"year": {"$gt": 2023}, "topic": "RAG"}
docs = retriever.invoke("Papers about RAG published after 2023")
🧮 Router Chain
from langchain_core.runnables import RunnableLambda
# Route to different chains based on query type
rag_chain = build_rag_chain()
sql_chain = build_sql_chain()
general_chain = prompt | llm | parser
def route(info: dict):
question = info["question"].lower()
if any(w in question for w in ["database", "table", "sql", "query"]):
return sql_chain
elif any(w in question for w in ["document", "pdf", "report"]):
return rag_chain
return general_chain
router = RunnableLambda(route)
full_chain = {"question": RunnablePassthrough()} | router
answer = full_chain.invoke({"question": "What's in the Q3 report?"})
LangChain Agents
Agents let the LLM decide which tools to call and in what order. The model acts in a loop: Thought → Action → Observation → Thought → … → Final Answer.
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
from langchain.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
# ── Define custom tools with @tool decorator ──
@tool
def calculate_compound_interest(principal: float, rate: float, years: int) -> str:
"""Calculate compound interest. Args: principal (USD), rate (annual %), years."""
amount = principal * (1 + rate / 100) ** years
return f"${amount:,.2f} after {years} years"
@tool
def get_stock_price(ticker: str) -> str:
"""Get the current stock price for a given ticker symbol."""
# In production: call a real finance API
prices = {"AAPL": 189.30, "GOOGL": 173.50, "AMZN": 185.20}
return f"{ticker}: ${prices.get(ticker.upper(), 'Not found')}"
@tool
def search_knowledge_base(query: str) -> str:
"""Search internal knowledge base for company-specific information."""
docs = retriever.invoke(query)
return "\n".join(d.page_content for d in docs[:3])
# ── Create agent ──
tools = [
TavilySearchResults(max_results=3),
calculate_compound_interest,
get_stock_price,
search_knowledge_base,
]
prompt = hub.pull("hwchase17/react") # standard ReAct prompt
agent = create_react_agent(llm=ChatOpenAI(model="gpt-4o", temperature=0), tools=tools, prompt=prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # shows Thought / Action / Observation loop
max_iterations=10, # safety cap — prevents infinite loops
handle_parsing_errors=True,
early_stopping_method="generate",
)
# Agent decides: search web → use calculator → search KB → synthesise
result = executor.invoke({
"input": "If I invest $10,000 in AAPL today at their historical 15% annual growth, what do I have in 10 years?"
})
print(result["output"])
LangSmith tracing: Set LANGCHAIN_TRACING_V2=true and LANGCHAIN_API_KEY=ls__... in your env. Every chain call, token count, latency, and agent step is logged to the LangSmith dashboard — indispensable for debugging agents.
LangGraph — Why Graphs Beat Chains
LangChain chains are linear (A→B→C). LangGraph adds cycles, state, branching, and persistence — the four things real AI systems need.
❌ When LCEL Chains Break Down
- → Agent needs to loop back (retry, refine) — chains are one-way
- → Need human approval mid-execution — chains can't pause
- → Multiple parallel agents need to coordinate — chains are single-thread
- → State must persist across sessions — chains are stateless by default
- → Need to resume after failure — chains restart from scratch
✅ LangGraph Solutions
- → Cycles — edges can loop back to any previous node
- → Interrupt/Resume — pause at any node for human approval
- → Parallel nodes — run multiple agents simultaneously, fan-out/fan-in
- → Checkpointers — SQLite/Redis/Postgres persistence between runs
- → Time travel — replay execution from any past checkpoint
Graph Basics — State, Nodes, Edges
Every LangGraph app has three parts: a State (TypedDict that flows through the graph), Nodes (Python functions that update state), and Edges (connections, including conditional branches).
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# ── 1. Define State ──
# Annotated[list, operator.add] means: each node APPENDS to messages (not replaces)
class State(TypedDict):
messages: Annotated[list, operator.add]
query: str
documents: list
answer: str
# ── 2. Define Nodes (each is a plain Python function) ──
def retrieve(state: State) -> State:
"""Retrieve relevant documents."""
docs = retriever.invoke(state["query"])
return {"documents": docs}
def generate(state: State) -> State:
"""Generate answer from retrieved docs."""
context = "\n".join(d.page_content for d in state["documents"])
response = llm.invoke([
HumanMessage(content=f"Context:\n{context}\n\nQuestion: {state['query']}")
])
return {
"answer": response.content,
"messages": [AIMessage(content=response.content)]
}
def grade_answer(state: State) -> str:
"""Conditional edge: route based on answer quality."""
# Simple heuristic — in production use an LLM grader
if len(state["answer"]) < 50 or "I don't know" in state["answer"]:
return "retry" # loop back to retrieve with better query
return "done"
# ── 3. Build Graph ──
builder = StateGraph(State)
builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)
# Edges
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")
# Conditional edge: done → END, retry → retrieve (loop!)
builder.add_conditional_edges(
"generate",
grade_answer,
{"done": END, "retry": "retrieve"}
)
# ── 4. Compile with checkpointer (enables persistence + interrupt) ──
memory = MemorySaver() # in-memory; swap for SqliteSaver / RedisSaver in prod
graph = builder.compile(checkpointer=memory)
# ── 5. Run with thread_id (each thread = isolated conversation) ──
config = {"configurable": {"thread_id": "user-123-session-1"}}
result = graph.invoke({"query": "What is CRAG?", "messages": [], "documents": [], "answer": ""}, config)
print(result["answer"])
# Resume same thread later — state is persisted!
result2 = graph.invoke({"query": "How does it compare to Self-RAG?", "messages": result["messages"], "documents": [], "answer": ""}, config)
# Print ASCII representation
graph.get_graph().print_ascii()
# Export as PNG (requires pygraphviz)
from IPython.display import Image
Image(graph.get_graph().draw_mermaid_png())
# Or get Mermaid markdown
print(graph.get_graph().draw_mermaid())
The 5 Essential LangGraph Patterns
Pattern 1 — ReAct Agent Loop Beginner
The classic agent loop: reason → act → observe → repeat until done. Built into LangGraph as a prebuilt.
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
@tool
def search(query: str) -> str:
"""Search the web for current information."""
return tavily.invoke(query)
@tool
def calculator(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression)) # use numexpr in production
# create_react_agent is the fastest path — wraps the full graph
agent = create_react_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=[search, calculator],
checkpointer=MemorySaver(), # memory across turns
prompt="You are a research assistant. Use tools to answer accurately."
)
# Stream intermediate steps (agent reasoning is visible)
for event in agent.stream(
{"messages": [HumanMessage(content="What's 15% of NVIDIA's current market cap?")]},
config={"configurable": {"thread_id": "t1"}},
stream_mode="values"
):
event["messages"][-1].pretty_print()
Pattern 2 — Human-in-the-Loop Intermediate
Pause execution before a dangerous action (send email, delete record, execute code), wait for human approval, then resume.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
class ApprovalState(TypedDict):
messages: Annotated[list, operator.add]
action: str
approved: bool
def plan_action(state):
response = llm.invoke(state["messages"])
return {"action": response.content, "messages": [response]}
def execute_action(state):
# Only runs after human approval
print(f"Executing: {state['action']}")
result = dangerous_api_call(state["action"])
return {"messages": [AIMessage(content=f"Done: {result}")]}
builder = StateGraph(ApprovalState)
builder.add_node("plan", plan_action)
builder.add_node("execute", execute_action)
builder.add_edge(START, "plan")
builder.add_edge("plan", "execute") # interrupted before this
builder.add_edge("execute", END)
# interrupt_before=["execute"] — graph pauses BEFORE running "execute"
graph = builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["execute"] # ← the magic
)
config = {"configurable": {"thread_id": "approval-flow-1"}}
# Run until the interrupt
graph.invoke({"messages": [HumanMessage(content="Delete all staging data")]}, config)
# ↑ Graph pauses here. Show plan to human...
state = graph.get_state(config)
print("Pending action:", state.values["action"])
# Human approves — resume by passing None (continue from checkpoint)
# To reject, update state instead
graph.invoke(None, config) # resumes from interrupt checkpoint
Pattern 3 — Parallel Fan-Out / Fan-In Intermediate
Run multiple agents or research branches simultaneously, then merge results. LangGraph handles the synchronisation.
from langgraph.graph import StateGraph, START, END
class ResearchState(TypedDict):
topic: str
web_results: str
db_results: str
summary: str
def web_researcher(state):
"""Searches the web — runs in parallel with db_researcher."""
results = tavily.invoke(state["topic"])
return {"web_results": str(results)}
def db_researcher(state):
"""Queries internal vector DB — runs in parallel."""
docs = retriever.invoke(state["topic"])
return {"db_results": "\n".join(d.page_content for d in docs)}
def synthesiser(state):
"""Waits for both branches, then merges."""
prompt = f"Web:\n{state['web_results']}\n\nInternal docs:\n{state['db_results']}\n\nSynthesise:"
answer = llm.invoke([HumanMessage(content=prompt)])
return {"summary": answer.content}
builder = StateGraph(ResearchState)
builder.add_node("web_researcher", web_researcher)
builder.add_node("db_researcher", db_researcher)
builder.add_node("synthesiser", synthesiser)
# Fan-out: START → both branches simultaneously
builder.add_edge(START, "web_researcher")
builder.add_edge(START, "db_researcher")
# Fan-in: both must complete before synthesiser runs
builder.add_edge("web_researcher", "synthesiser")
builder.add_edge("db_researcher", "synthesiser")
builder.add_edge("synthesiser", END)
graph = builder.compile()
result = graph.invoke({"topic": "LangGraph vs Crew AI", "web_results": "", "db_results": "", "summary": ""})
print(result["summary"])
Pattern 4 — Sub-graphs Advanced
Compose complex graphs from smaller reusable graphs. A sub-graph is compiled independently and added as a node in a parent graph.
# Sub-graph: a reusable RAG pipeline
rag_builder = StateGraph(State)
rag_builder.add_node("retrieve", retrieve)
rag_builder.add_node("generate", generate)
rag_builder.add_edge(START, "retrieve")
rag_builder.add_edge("retrieve", "generate")
rag_builder.add_edge("generate", END)
rag_graph = rag_builder.compile()
# Parent graph uses the sub-graph as a node
parent_builder = StateGraph(ParentState)
parent_builder.add_node("classify", classify_query)
parent_builder.add_node("rag", rag_graph) # ← sub-graph as node
parent_builder.add_node("sql_agent", sql_agent_graph) # ← another sub-graph
parent_builder.add_conditional_edges(
"classify",
lambda s: s["query_type"],
{"document": "rag", "database": "sql_agent"}
)
parent_graph = parent_builder.compile()
Memory — Short-Term, Long-Term, Semantic
LangGraph has a first-class memory system. Thread-scoped memory (checkpointer) for conversational context, and cross-thread memory (store) for user profiles and facts.
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import SystemMessage
import json
# ── Thread-scoped memory (per conversation) ──
# SqliteSaver: persists to disk — survives restarts
db_path = "checkpoints.db"
with SqliteSaver.from_conn_string(db_path) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-42-chat-7"}}
graph.invoke({"messages": [HumanMessage("My name is Alice")]}, config)
# Restart app — state still there!
graph.invoke({"messages": [HumanMessage("What's my name?")]}, config)
# → "Your name is Alice" ✓
# ── Cross-thread memory (user profile, facts) ──
store = InMemoryStore() # swap for PostgresStore in production
def chatbot_with_memory(state, config, *, store):
user_id = config["configurable"]["user_id"]
namespace = ("users", user_id, "memories")
# Recall existing memories for this user
memories = store.search(namespace, query=state["messages"][-1].content, limit=3)
memory_text = "\n".join(m.value["fact"] for m in memories) if memories else ""
system = f"""You are a personal assistant.
User facts you know:
{memory_text}"""
response = llm.invoke([SystemMessage(content=system)] + state["messages"])
# Extract and save new facts mentioned in this message
new_facts = extract_facts(state["messages"][-1].content)
for fact in new_facts:
store.put(namespace, key=fact[:50], value={"fact": fact})
return {"messages": [response]}
# "My dog is called Max" → stored as fact
# Next session: "What's my dog's name?" → "Max" (retrieved from store)
| Memory Type | Scope | Implementation | Use Case |
|---|---|---|---|
| In-context | Single response | messages list in state | Conversational context window |
| Thread (short-term) | One conversation | Checkpointer (SQLite/Redis) | Multi-turn chat, resume sessions |
| Cross-thread (long-term) | Across all chats | Store (Postgres/Pinecone) | User preferences, facts, profiles |
| Semantic | Cross-thread | Store + embedding search | "What did the user say about X?" |
Multi-Agent Systems
Orchestrate multiple specialised agents — a Supervisor routes tasks to Worker agents, each with their own tools and expertise.
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from typing import Literal
# ── Create specialised worker agents ──
researcher = create_react_agent(
ChatOpenAI(model="gpt-4o"), tools=[tavily_search, vectorstore_search],
prompt="You are a research specialist. Find accurate information."
)
coder = create_react_agent(
ChatOpenAI(model="gpt-4o"), tools=[python_repl, code_interpreter],
prompt="You are a Python expert. Write clean, working code."
)
analyst = create_react_agent(
ChatOpenAI(model="gpt-4o"), tools=[sql_tool, chart_tool],
prompt="You are a data analyst. Query data and visualise insights."
)
# ── Supervisor state ──
class SupervisorState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
# ── Supervisor node: LLM decides which agent to call next ──
SUPERVISOR_PROMPT = """You are a supervisor managing: researcher, coder, analyst, FINISH.
Given the conversation, decide which agent should act next, or FINISH if done.
Respond with just the agent name."""
def supervisor(state: SupervisorState) -> SupervisorState:
response = llm.invoke([
SystemMessage(content=SUPERVISOR_PROMPT),
*state["messages"]
])
return {"next_agent": response.content.strip()}
def route(state) -> Literal["researcher", "coder", "analyst", END]:
next_a = state["next_agent"]
if next_a == "FINISH": return END
return next_a
# ── Build supervisor graph ──
builder = StateGraph(SupervisorState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", lambda s: {"messages": researcher.invoke(s)["messages"]})
builder.add_node("coder", lambda s: {"messages": coder.invoke(s)["messages"]})
builder.add_node("analyst", lambda s: {"messages": analyst.invoke(s)["messages"]})
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route)
# All workers report back to supervisor after each turn
for worker in ["researcher", "coder", "analyst"]:
builder.add_edge(worker, "supervisor")
graph = builder.compile(checkpointer=MemorySaver())
# Complex task: supervisor orchestrates multiple agents automatically
result = graph.invoke({
"messages": [HumanMessage(content=
"Research LangGraph's architecture, write a Python example of a multi-agent system, "
"and analyse what percentage of GitHub repos use LangChain vs LangGraph"
)],
"next_agent": ""
}, config={"configurable": {"thread_id": "complex-task-1"}})
🆚 LangGraph vs CrewAI
LangGraph = low-level graph control. CrewAI = high-level role-based abstraction. LangGraph wins on flexibility; CrewAI wins on onboarding speed.
🔍 LangSmith Debugging
Every node execution, token count, and latency is traced. Filter by thread_id, tag chains by use-case, compare runs side-by-side.
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="ls__..."
export LANGCHAIN_PROJECT="my-agent"
# All runs now appear in app.langsmith.com
🚀 LangGraph Platform
Deploy LangGraph graphs as production APIs. Built-in: horizontal scaling, streaming, cron jobs, webhooks, Studio UI for visual debugging.
# Deploy to LangGraph Cloud
pip install langgraph-cli
langgraph up # local Studio
langgraph deploy # cloud deployment
📋 LangChain vs LangGraph — When to Use Which
| Situation | Use | Why |
|---|---|---|
| Simple Q&A, summarisation, translation | LCEL Chain | Linear, no state needed |
| RAG with a single retrieval step | LCEL + Retriever | create_retrieval_chain covers it |
| Agent that uses tools in a loop | LangGraph ReAct | Cycles + state + interrupt support |
| Multi-turn chat with memory | LangGraph + Checkpointer | Thread-scoped persistence built-in |
| Human approval before action | LangGraph interrupt_before | LCEL can't pause mid-execution |
| Multiple agents collaborating | LangGraph Supervisor | Fan-out/fan-in, sub-graphs, coordination |
| Long-running background task | LangGraph Platform | Durable execution, webhooks, streaming |
The Complete AI Professional Stack
Think in layers. Each layer depends on the ones below it. Most practitioners skip foundations and wonder why they can't debug models or design architectures from first principles.
✅ Already In This Guide
🗺️ What This Roadmap Adds
Mathematics for AI — The Non-Negotiables
You don't need a PhD, but you need enough math to read papers, understand what's actually happening inside models, and debug when things go wrong.
📐 Linear Algebra (Most Important)
import numpy as np
# Vectors = embeddings. Dot product = similarity.
v1 = np.array([0.2, 0.8, 0.5]) # "king" embedding
v2 = np.array([0.1, 0.9, 0.4]) # "queen" embedding
cosine_sim = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
print(f"Cosine similarity: {cosine_sim:.4f}") # 0.9996
# Matrix multiplication = linear layer (weight matrix W applied to input x)
W = np.random.randn(768, 3072) # FFN expand layer (GPT-2 style)
x = np.random.randn(768) # token embedding
out = W.T @ x # → 3072-dim hidden state
# SVD = how PCA, LoRA, and embedding compression work
U, S, Vt = np.linalg.svd(W, full_matrices=False)
# Keep top-r singular values = low-rank approximation (the core of LoRA)
r = 8
W_approx = U[:, :r] @ np.diag(S[:r]) @ Vt[:r, :]
print(f"Rank-{r} compression: {W.shape} → saved {1 - r*2/768:.1%} params")
# Eigenvalues = used in attention score analysis, PCA
eigenvalues, eigenvectors = np.linalg.eig(W @ W.T)
🎲 Probability & Statistics
import torch
import torch.nn.functional as F
# Softmax = turning logits into probabilities (used everywhere)
logits = torch.tensor([2.0, 1.0, 0.1])
probs = F.softmax(logits, dim=-1)
# tensor([0.6590, 0.2424, 0.0986]) — sums to 1
# Cross-entropy loss = how LLMs are trained (predict next token)
# True label = index 0 ("the" was the next word)
target = torch.tensor([0])
loss = F.cross_entropy(logits.unsqueeze(0), target)
print(f"Loss: {loss.item():.4f}") # -log(0.659) = 0.417
# KL Divergence = how DPO/RLHF penalise diverging from reference model
p = torch.softmax(torch.tensor([3.0, 1.0, 0.5]), dim=0) # policy
q = torch.softmax(torch.tensor([2.5, 1.2, 0.3]), dim=0) # reference
kl = (p * (p / q).log()).sum()
print(f"KL(p||q) = {kl.item():.4f}")
# Temperature sampling (controls randomness of generation)
temp = 0.7
scaled_logits = logits / temp
probs_temp = F.softmax(scaled_logits, dim=-1)
# Lower temp → more deterministic. temp=0 → greedy.
∂ Calculus — Gradients & Backprop
import torch
# Automatic differentiation — how PyTorch computes gradients
x = torch.tensor(2.0, requires_grad=True)
y = x ** 3 + 2 * x # y = x³ + 2x
y.backward() # compute dy/dx via chain rule
print(x.grad) # tensor(14.) = 3x² + 2 at x=2
# A simple neural network: forward → loss → backward → step
model = torch.nn.Linear(10, 1)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)
for step in range(100):
x = torch.randn(32, 10) # batch of 32
y_true = torch.randn(32, 1)
y_pred = model(x)
loss = F.mse_loss(y_pred, y_true)
optimizer.zero_grad() # clear old gradients
loss.backward() # compute new gradients
optimizer.step() # update weights: w = w - lr * grad
# Key optimizers to know:
# SGD → simple, good for vision models
# Adam → adaptive lr, great for NLP
# AdamW → Adam + weight decay → prevents overfitting (used for LLMs)
# Adafactor → memory-efficient, used for very large models
📚 Resources to Learn These
- 3Blue1Brown — "Essence of Linear Algebra" + "Neural Networks" YouTube series. Best visual intuition available.
- fast.ai — Practical Deep Learning for Coders. Top-down, code-first, free.
- Andrej Karpathy — makemore + nanoGPT — Build a GPT from scratch in pure PyTorch. Best LLM intuition builder.
- CS229 (Stanford) — ML theory, publicly available. Covers probability + optimisation rigorously.
- Dive into Deep Learning (d2l.ai) — Free textbook with code. Covers everything from perceptrons to transformers.
Transformer Internals & Deep Learning Architecture
To architect and debug LLM systems at a professional level, you must understand what actually happens inside a transformer — not just the API.
import torch
import torch.nn.functional as F
import math
def scaled_dot_product_attention(Q, K, V, mask=None):
"""
The heart of every transformer.
Q, K, V: [batch, heads, seq_len, head_dim]
"""
d_k = Q.size(-1)
# Attention scores: how much each token should attend to each other token
scores = Q @ K.transpose(-2, -1) / math.sqrt(d_k) # [B, H, seq, seq]
# Causal mask: decoder can't see future tokens (autoregressive)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
# Convert scores to probabilities
attn_weights = F.softmax(scores, dim=-1)
# Weighted sum of values
return attn_weights @ V, attn_weights # [B, H, seq, head_dim]
class MultiHeadAttention(torch.nn.Module):
def __init__(self, d_model=768, n_heads=12):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.W_q = torch.nn.Linear(d_model, d_model, bias=False)
self.W_k = torch.nn.Linear(d_model, d_model, bias=False)
self.W_v = torch.nn.Linear(d_model, d_model, bias=False)
self.W_o = torch.nn.Linear(d_model, d_model, bias=False)
def forward(self, x, mask=None):
B, T, C = x.shape # batch, seq_len, d_model
# Project → split into heads
def split_heads(w): return w.view(B, T, self.n_heads, self.d_head).transpose(1, 2)
Q, K, V = split_heads(self.W_q(x)), split_heads(self.W_k(x)), split_heads(self.W_v(x))
# Attention
out, weights = scaled_dot_product_attention(Q, K, V, mask)
# Merge heads → project out
out = out.transpose(1, 2).contiguous().view(B, T, C)
return self.W_o(out), weights
# Flash Attention (PyTorch 2.0+) — same math, 3× faster, 10× less memory
# Use this in production, not the above:
with torch.backends.cuda.sdp_kernel(enable_flash=True):
out = F.scaled_dot_product_attention(Q, K, V, is_causal=True)
| Architecture | Key Idea | Best For | Examples |
|---|---|---|---|
| GPT (Decoder-only) | Causal attention, predicts next token | Generation, chat, code | GPT-4, Claude, Llama 3 |
| BERT (Encoder-only) | Bidirectional attention, masked LM | Classification, embeddings | BERT, RoBERTa, E5 |
| T5 (Encoder-Decoder) | Seq2seq with cross-attention | Translation, summarisation | T5, FLAN-T5, mT5 |
| MoE (Mixture of Experts) | Route each token to top-k expert FFNs | Scale efficiency | Mixtral, GPT-4 (rumoured), Gemini |
| Mamba (SSM) | State space, linear time complexity | Very long contexts | Mamba, Jamba |
| Diffusion | Learn to denoise from Gaussian noise | Image/video/audio gen | SD 3, DALL-E 3, Sora |
Fine-Tuning LLMs — SFT, LoRA, DPO, RLHF
Fine-tuning adapts a pre-trained LLM to your domain or behaviour. In 2025–2026 the best practitioners combine LoRA-efficient training with DPO preference alignment.
🔧 LoRA — Low-Rank Adaptation
Instead of updating all 7B parameters, train two small matrices A and B whose product approximates the weight update. Reduces trainable params by 99%+.
from peft import LoraConfig, get_peft_model, TaskType
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from trl import SFTTrainer
# Load base model in 4-bit (QLoRA = LoRA + 4-bit quantisation)
from transformers import BitsAndBytesConfig
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16,
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Meta-Llama-3-8B",
quantization_config=bnb_config,
device_map="auto"
)
# Apply LoRA adapters (only train these ~0.5% of params)
lora_config = LoraConfig(
r=16, # rank — higher = more capacity, more params
lora_alpha=32, # scaling factor
target_modules=["q_proj", "v_proj", "k_proj", "o_proj"], # which layers
lora_dropout=0.05,
task_type=TaskType.CAUSAL_LM,
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
# trainable params: 3,407,872 || all params: 8,033,669,120 || trainable: 0.042%
# Train with SFTTrainer (Supervised Fine-Tuning)
trainer = SFTTrainer(
model=model,
train_dataset=my_dataset, # {"text": "prompt + completion"}
dataset_text_field="text",
max_seq_length=2048,
args=TrainingArguments(
output_dir="./lora-finetuned",
per_device_train_batch_size=4,
gradient_accumulation_steps=4, # effective batch = 16
warmup_steps=100,
num_train_epochs=3,
learning_rate=2e-4,
fp16=True,
logging_steps=10,
),
)
trainer.train()
🎯 DPO — Direct Preference Optimisation
Aligns model behaviour to human preferences WITHOUT a separate reward model. Train on (prompt, chosen, rejected) pairs. Simpler than RLHF, just as effective.
from trl import DPOTrainer, DPOConfig
# DPO dataset: preferred vs rejected responses
# {
# "prompt": "Explain quantum computing",
# "chosen": "Quantum computing uses qubits...", ← preferred
# "rejected": "Quantum computing is too complex...", ← rejected
# }
dpo_trainer = DPOTrainer(
model=sft_model, # start from your SFT model
ref_model=base_model, # reference (frozen) model
args=DPOConfig(
beta=0.1, # KL penalty — how far from ref model
max_prompt_length=512,
max_length=1024,
output_dir="./dpo-aligned",
per_device_train_batch_size=4,
learning_rate=5e-5,
num_train_epochs=1,
),
train_dataset=preference_dataset,
tokenizer=tokenizer,
)
dpo_trainer.train()
# RLHF vs DPO:
# RLHF: train reward model → PPO (complex, unstable, expensive)
# DPO: direct training from preference data (simple, stable, same quality)
# → DPO is now the default choice for alignment
Dataset sizes: SFT needs ~1K–10K high-quality examples. DPO needs ~500–5K preference pairs. Quality >> quantity — curate carefully.
Advanced Prompt Engineering Must Know
Goes far beyond "write a better prompt". These techniques directly affect quality as much as fine-tuning.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# ── Chain-of-Thought (CoT) — "think step by step" unlocks reasoning ──
cot_prompt = ChatPromptTemplate.from_messages([("human",
"{question}\n\nThink step by step before giving your final answer."
)])
# Improves accuracy on math/logic by 20-40%
# ── Tree of Thought (ToT) — explore multiple reasoning paths ──
tot_prompt = """Explore 3 different approaches to this problem:
{problem}
For each approach:
1. Describe the approach
2. Evaluate if it leads to the correct answer
3. Score confidence 1-10
Then select the best approach and give the final answer."""
# ── Self-Consistency — sample N times, majority vote ──
def self_consistent_answer(question: str, n: int = 5) -> str:
from collections import Counter
answers = [llm.invoke(question).content for _ in range(n)]
# Parse final answers and take majority vote
final_answers = [extract_answer(a) for a in answers]
return Counter(final_answers).most_common(1)[0][0]
# ── ReAct — Reason + Act (the prompt behind agents) ──
react_prompt = """Answer the question using this format:
Thought: I need to think about what to do
Action: tool_name[input]
Observation: (result from tool)
... repeat as needed ...
Final Answer: your answer
Question: {question}"""
# ── System prompt engineering for Claude ──
SYSTEM = """You are a senior financial analyst.
- Only use data provided in the context
- Express uncertainty explicitly
- Always cite the source paragraph
## Analysis
## Key Risks
## Recommendation (Buy/Hold/Sell)
"""
Inference Optimization — Speed & Cost at Scale
A model that's too slow or too expensive doesn't ship. These techniques can reduce inference cost by 5–20× without touching accuracy.
⚡ vLLM — PagedAttention
Manages KV cache like virtual memory. 24× higher throughput than HuggingFace transformers. The production standard for self-hosted LLMs.
# Serve Llama 3 with vLLM
pip install vllm
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Meta-Llama-3-8B-Instruct \
--tensor-parallel-size 2 \
--gpu-memory-utilization 0.9 \
--max-model-len 8192
# OpenAI-compatible API at :8000
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"meta-llama/Meta-Llama-3-8B-Instruct",
"messages":[{"role":"user","content":"Hello"}]}'
🗜️ Quantization
Reduce weight precision from FP32 → FP16 → INT8 → INT4. 4-bit = 4× less VRAM. Use GPTQ or AWQ for post-training quantisation.
from transformers import AutoModelForCausalLM
from awq import AutoAWQForCausalLM
# AWQ 4-bit quantisation (best quality/speed tradeoff)
model = AutoAWQForCausalLM.from_pretrained("meta-llama/Llama-3-8B")
quant_config = {"zero_point": True, "q_group_size": 128, "w_bit": 4, "version": "GEMM"}
model.quantize(tokenizer, quant_config=quant_config)
model.save_quantized("llama3-8b-awq-4bit")
# 8B model: 16GB FP16 → 4GB INT4 — runs on a single 4090
🔮 Speculative Decoding
A small draft model generates N tokens speculatively. The large model verifies all in one pass. 2–3× speedup for same quality output.
from transformers import pipeline
# Built into HuggingFace transformers
pipe = pipeline(
"text-generation",
model="meta-llama/Llama-3-70B", # large verifier
assistant_model="meta-llama/Llama-3-8B", # small drafter
)
out = pipe("Explain transformers in detail")
# ~2.5× faster token generation
| Technique | Speedup | Quality Loss | VRAM Reduction | Best For |
|---|---|---|---|---|
| FP16 / BF16 | 1.5–2× | Negligible | 2× | All production deployments |
| INT8 (bitsandbytes) | 1.5× | <1% | 4× | Inference on smaller GPUs |
| AWQ/GPTQ INT4 | 2–3× | ~1–2% | 8× | Edge / cost-sensitive |
| vLLM PagedAttention | 10–24× | None | Same | High-throughput serving |
| Speculative Decoding | 2–3× | None | Slightly more | Latency-sensitive single requests |
| Flash Attention 2 | 2–4× | None | 10× less activation | Training & long-context inference |
| Pruning + Distillation | 2–10× | 3–8% | 2–10× | Edge/mobile deployment |
MLOps — Taking Models to Production Reliably
📊 Weights & Biases (W&B)
The industry standard for experiment tracking, model versioning, and hyperparameter sweeps. Every training run logged automatically.
import wandb
wandb.init(project="llm-finetuning", name="lora-r16-lr2e-4", config={
"model": "llama3-8b", "r": 16, "learning_rate": 2e-4, "epochs": 3
})
for epoch in range(3):
for batch in dataloader:
loss = train_step(batch)
wandb.log({"train/loss": loss, "epoch": epoch}) # live dashboard
# Log final model as artifact (versioned)
artifact = wandb.Artifact("lora-adapter", type="model")
artifact.add_dir("./lora-finetuned/")
wandb.log_artifact(artifact)
wandb.finish()
# Hyperparameter sweep (Bayesian optimisation)
sweep_config = {
"method": "bayes",
"metric": {"goal": "minimize", "name": "val/loss"},
"parameters": {
"learning_rate": {"min": 1e-5, "max": 5e-4},
"r": {"values": [8, 16, 32, 64]},
"batch_size": {"values": [4, 8, 16]},
}
}
sweep_id = wandb.sweep(sweep_config, project="lora-sweep")
wandb.agent(sweep_id, function=train, count=20)
📦 MLflow — Model Registry
Track experiments, package models with dependencies, deploy to any serving platform. Integrates with SageMaker, Databricks, Azure ML.
import mlflow
import mlflow.pyfunc
mlflow.set_experiment("rag-system-v2")
with mlflow.start_run():
mlflow.log_params({
"embedding_model": "text-embedding-3-small",
"chunk_size": 512, "k": 5, "llm": "gpt-4o"
})
# Run evaluation
scores = evaluate_rag(test_questions)
mlflow.log_metrics({
"faithfulness": scores["faithfulness"],
"answer_relevancy": scores["answer_relevancy"],
"context_recall": scores["context_recall"],
})
# Log the RAG pipeline as a model
class RAGModel(mlflow.pyfunc.PythonModel):
def predict(self, ctx, model_input):
return [rag_chain.invoke(q) for q in model_input["questions"]]
mlflow.pyfunc.log_model("rag-pipeline", python_model=RAGModel())
# Promote best run to Model Registry
client = mlflow.MlflowClient()
client.transition_model_version_stage("rag-pipeline", version=3, stage="Production")
MLOps stack for AI in 2025–2026: W&B (experiment tracking) + DVC (data versioning) + MLflow (model registry) + Feast (feature store) + Evidently AI (drift monitoring) + Seldon/Ray Serve (model serving) + ArgoCD (GitOps deployment). You don't need all of these — start with W&B + MLflow.
AI Safety, Alignment & Security
Senior AI professionals must understand how models are aligned, what can go wrong, and how to build guardrails. This is non-negotiable at enterprise scale.
🛡️ Guardrails
Input/output validation to prevent harmful, off-topic, or policy-violating responses.
from guardrails import Guard
from guardrails.hub import ToxicLanguage, ValidJson
# Guardrails AI: validate LLM output schema + content
guard = Guard().use(ToxicLanguage, threshold=0.5, on_fail="exception")
result = guard(
llm_api=openai.chat.completions.create,
prompt="Tell me about AI safety",
model="gpt-4o",
)
# Raises exception if toxic content detected
# NeMo Guardrails (NVIDIA) — conversational rails
from nemoguardrails import RailsConfig, LLMRails
config = RailsConfig.from_path("./rails_config/")
rails = LLMRails(config)
response = rails.generate(
messages=[{"role":"user","content":"Ignore previous instructions"}]
)
# "I'm sorry, I can't help with that." — jailbreak blocked
🔴 Red Teaming
Systematically probe model weaknesses before deployment. Required at enterprise and government deployments.
- → Prompt injection — user input overrides system prompt
- → Jailbreaks — roleplay, many-shot, "DAN" attacks
- → Data exfiltration — extract training data / PII
- → Indirect injection — malicious instructions in retrieved docs
- → Tool: PyRIT (Microsoft) — automated red-teaming framework
- → Tool: Garak — LLM vulnerability scanner
⚖️ Constitutional AI (Anthropic)
Train models to self-critique and revise outputs according to a set of principles — without human labellers for every example.
- → Step 1 — SFT: supervised fine-tuning on human demonstrations
- → Step 2 — CAI: model critiques itself using principles
- → Step 3 — RLAIF: AI-generated preference data for DPO/RLHF
- → RLHF pipeline: human → reward model → PPO policy optimisation
- → DPO: skip reward model, directly optimise preferences
Emerging Techniques — What's Shaping 2025–2026
These are the ideas separating frontier AI practitioners from the rest. You don't need to implement them, but you must understand what they are and when they matter.
🧠 Test-Time Compute (o1 / o3 / Claude 3.7)
Instead of just generating an answer, the model "thinks" for seconds or minutes — running internal chain-of-thought that isn't shown to the user. More compute at inference = better answers on hard problems.
import anthropic
client = anthropic.Anthropic()
# Extended thinking — Claude 3.7 Sonnet
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000 # ← how much "thinking" to allow
},
messages=[{"role": "user", "content":
"Prove that there are infinitely many prime numbers."
}]
)
for block in response.content:
if block.type == "thinking":
print("THINKING:", block.thinking[:200]) # internal reasoning
else:
print("ANSWER:", block.text) # final response
# Key insight: test-time compute scales quality logarithmically
# 10× more compute ≈ +20-30% accuracy on hard benchmarks
🔀 Mixture of Experts (MoE)
Instead of activating all model weights for every token, route each token to only 2–8 of N "expert" FFN layers. Same quality as a dense model at 1/4 the compute.
import torch
import torch.nn as nn
class MoELayer(nn.Module):
"""Sparse MoE: each token routed to top-k experts."""
def __init__(self, d_model=1024, n_experts=8, top_k=2):
super().__init__()
self.router = nn.Linear(d_model, n_experts, bias=False)
self.experts = nn.ModuleList([FFN(d_model) for _ in range(n_experts)])
self.top_k = top_k
def forward(self, x):
B, T, D = x.shape
router_logits = self.router(x) # [B, T, n_experts]
router_weights = torch.softmax(router_logits, dim=-1)
topk_weights, topk_idx = router_weights.topk(self.top_k, dim=-1) # select top-2
# Only compute the selected experts — massive compute savings
output = torch.zeros_like(x)
for k in range(self.top_k):
expert_idx = topk_idx[..., k] # which expert for each token
expert_weight = topk_weights[..., k].unsqueeze(-1)
# Route each token to its expert (simplified)
for e_idx, expert in enumerate(self.experts):
mask = (expert_idx == e_idx)
if mask.any():
output[mask] += expert_weight[mask] * expert(x[mask])
return output
# Mixtral 8×7B: 8 experts, 2 active per token
# → 47B total params but only 13B active per forward pass
🌊 State Space Models — Mamba
Attention is O(n²) in sequence length. SSMs are O(n). Mamba matches transformer quality on many tasks while being 5× faster at long sequences (>8K tokens).
Key idea: Compress the entire context history into a fixed-size hidden state using selective state space. The "selection" mechanism lets the model decide what to remember — like a learnable RNN but parallelisable during training. Not yet replacing transformers but strong for document processing, time series, genomics.
🌍 Multimodal Frontier — 2026
The future is natively multimodal. Models that see, hear, generate images and video — trained end-to-end, not bolted together.
import anthropic, base64
client = anthropic.Anthropic()
# Claude 3.7 — vision + text in one call
with open("chart.png", "rb") as f:
img_b64 = base64.standard_b64encode(f.read()).decode()
response = client.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=1024,
messages=[{"role": "user", "content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": img_b64}},
{"type": "text", "text": "Extract all numbers from this chart as JSON"}
]}]
)
🏆 High-End AI Professional — Complete Skill Checklist
| Area | Junior | Mid | Senior / Architect |
|---|---|---|---|
| Foundations | Can use APIs | Understands embeddings + attention | Reads papers, implements from scratch |
| RAG | Basic pipeline | HyDE, hybrid, reranking | GraphRAG, Agentic, production eval |
| Fine-Tuning | Runs SFT notebook | LoRA/QLoRA on custom data | DPO alignment, dataset curation |
| Inference | Uses hosted APIs | Self-hosts with vLLM | Quantization, speculative decoding, Flash Attention |
| Agents | LangChain ReAct | LangGraph multi-step | Multi-agent, HITL, sub-graphs |
| MLOps | Saves model locally | W&B + MLflow tracking | CI/CD for ML, feature stores, drift monitoring |
| Cloud / Infra | Uses managed APIs | ECS + RDS + SQS | GPU clusters, Kubernetes, multi-region |
| Safety | Adds basic filters | Guardrails + red team basics | Constitutional AI, RLHF, enterprise governance |
| Evaluation | Manual testing | RAGAS metrics | LLM-as-judge, adversarial evals, benchmarks |
| Frontier | Reads announcements | Uses new models via API | Understands MoE/SSM/test-time compute tradeoffs |
Best learning path: Build the nanoGPT (Karpathy) → fine-tune Llama 3 with LoRA → build a production RAG system → add LangGraph agents → deploy on AWS ECS with W&B monitoring → study one frontier paper per week from arxiv.org/list/cs.LG/recent. Repeat. Ship things.
AI Frontiers
Computer Vision · Diffusion Models · Speech AI · Reinforcement Learning · Model Context Protocol — the remaining pillars every senior AI engineer must command.
Computer Vision — CNNs to Foundation Models
Three revolutions: hand-crafted features → CNNs → Vision Transformers (ViTs). Today's frontier: foundation models that unify detection, segmentation, and generation in one architecture.
YOLOv8 — Real-Time Object Detection
Single-pass detection under 50ms. Best for production systems needing speed.
from ultralytics import YOLO
# Inference — models: yolov8n (fastest) → yolov8x (most accurate)
model = YOLO("yolov8n.pt")
results = model("image.jpg", conf=0.5, iou=0.45)
for r in results:
boxes = r.boxes.xyxy.cpu().numpy() # [x1,y1,x2,y2]
clsids = r.boxes.cls.cpu().numpy()
confs = r.boxes.conf.cpu().numpy()
for box, cls, conf in zip(boxes, clsids, confs):
print(f"{model.names[int(cls)]} {conf:.2f} @ {box}")
# Fine-tune on custom dataset (custom.yaml defines class paths)
model.train(data="custom.yaml", epochs=100, imgsz=640, batch=16)
# Export for edge deployment
model.export(format="onnx") # or "tflite", "coreml"
SAM 2 — Segment Anything
Meta's foundation model for zero-shot segmentation. Prompt with points, boxes, or masks. Works on video too (track across frames).
from sam2.build_sam import build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor
import numpy as np
model = build_sam2("sam2_hiera_large.pt")
predictor = SAM2ImagePredictor(model)
predictor.set_image(image_np) # HWC uint8 RGB
# Prompt with a foreground point
masks, scores, _ = predictor.predict(
point_coords=np.array([[500, 375]]),
point_labels=np.array([1]), # 1=fg, 0=bg
multimask_output=True,
)
best_mask = masks[scores.argmax()] # bool H×W array
# Or prompt with a bounding box
masks, _, _ = predictor.predict(
box=np.array([100, 200, 400, 600]), # x1,y1,x2,y2
multimask_output=False,
)
CLIP — Vision-Language Alignment
Contrastive learning on 400M image-text pairs. Zero-shot classification, image search, visual RAG retrieval.
from transformers import CLIPModel, CLIPProcessor
from PIL import Image
model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
proc = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
image = Image.open("photo.jpg")
labels = ["a dog running", "a cat sleeping", "a car driving"]
inputs = proc(text=labels, images=image,
return_tensors="pt", padding=True)
outputs = model(**inputs)
probs = outputs.logits_per_image.softmax(dim=1)
for label, p in zip(labels, probs[0]):
print(f"{p:.3f} {label}")
Vision Model Comparison
| Model | Task | Best For |
|---|---|---|
| YOLOv8n/x | Detection | Real-time edge to server |
| SAM 2 | Segmentation | Any-object zero-shot masking |
| ViT-L/16 | Classification | High-accuracy image cls |
| CLIP L/14 | Vision-Language | Zero-shot, semantic search |
| DINO v2 | Dense features | Self-supervised repr. |
| Florence-2 | Universal | Caption + detect + ground |
| GPT-4V / Claude 3.5 | VLM | Complex visual reasoning |
Diffusion Models — How AI Generates Images & Video
Diffusion models learn to reverse a noise process. Three components: VAE compresses images to latent space, UNet/Transformer denoises iteratively, CLIP encoder conditions on text.
SDXL + ControlNet with Diffusers
from diffusers import (StableDiffusionXLPipeline,
ControlNetModel,
StableDiffusionXLControlNetPipeline)
import torch
# Basic SDXL text-to-image
pipe = StableDiffusionXLPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0",
torch_dtype=torch.float16,
).to("cuda")
image = pipe(
prompt="futuristic city at dusk, cinematic lighting, 8k",
negative_prompt="blurry, low quality, cartoon",
num_inference_steps=30,
guidance_scale=7.5, # CFG: adherence to prompt
width=1024, height=1024,
).images[0]
# ControlNet: control composition with Canny edges / depth / pose
controlnet = ControlNetModel.from_pretrained(
"diffusers/controlnet-canny-sdxl-1.0",
torch_dtype=torch.float16
)
pipe_ctrl = StableDiffusionXLControlNetPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0",
controlnet=controlnet, torch_dtype=torch.float16,
).to("cuda")
# FLUX.1 — 2024 state of the art (open weights)
from diffusers import FluxPipeline
pipe = FluxPipeline.from_pretrained(
"black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16
).to("cuda")
Image LoRA Fine-Tuning + Video Gen
# HuggingFace diffusers training script
accelerate launch train_dreambooth_lora_sdxl.py \
--pretrained_model="stabilityai/stable-diffusion-xl-base-1.0" \
--instance_data_dir="my_images/" \
--instance_prompt="photo of sks dog" \
--output_dir="lora-weights/" \
--rank=16 --learning_rate=1e-4 \
--max_train_steps=1000 --mixed_precision="fp16"
pipe.load_lora_weights("lora-weights/")
pipe.fuse_lora(lora_scale=0.9)
image = pipe("a sks dog on the moon").images[0]
Video gen 2025: Sora (OpenAI), Wan2.1 (Alibaba, open), Kling, Runway Gen-3. All use Diffusion Transformers (DiT) with temporal attention — treating video as 3D latent volumes.
Speech & Audio AI — ASR, TTS, Audio Generation
Whisper — Automatic Speech Recognition
OpenAI's multilingual ASR. 99 languages, 680K hours training. Use faster-whisper (CTranslate2) for 4× speed on GPU.
from faster_whisper import WhisperModel
# tiny / base / small / medium / large-v3
model = WhisperModel("large-v3", device="cuda",
compute_type="float16")
segments, info = model.transcribe(
"audio.mp3",
language="en", # None = auto-detect
beam_size=5,
word_timestamps=True, # word-level timing
vad_filter=True, # voice activity detection
)
for seg in segments:
print(f"[{seg.start:.2f}s → {seg.end:.2f}s] {seg.text}")
for word in seg.words:
print(f" {word.word!r} @{word.start:.2f}s")
TTS — Text to Speech Options
# ElevenLabs — best quality + voice cloning
from elevenlabs import ElevenLabs
client = ElevenLabs(api_key="xi-...")
audio = client.generate(
text="Hello, synthesized voice",
voice="Rachel",
model="eleven_multilingual_v2",
)
# Kokoro — local, Apache 2.0, 82M params, ~0.5s RTF
from kokoro import KPipeline
pipeline = KPipeline(lang_code="a") # 'a' = American English
audio, sr = pipeline("Hello world!", voice="af_sarah")
# OpenAI TTS — production, 6 voices
from openai import OpenAI
r = OpenAI().audio.speech.create(
model="tts-1-hd", voice="nova",
input="The quick brown fox"
)
r.stream_to_file("out.mp3")
MusicGen — Audio Generation
from audiocraft.models import MusicGen
from audiocraft.data.audio import audio_write
model = MusicGen.get_pretrained("facebook/musicgen-stereo-large")
model.set_generation_params(duration=8) # seconds
wav = model.generate([
"upbeat jazz piano with walking bass, 120 bpm",
"dark cinematic orchestral tension",
]) # shape: [B, C, T]
for i, wav_i in enumerate(wav):
audio_write(f"track_{i}", wav_i.cpu(),
model.sample_rate, strategy="loudness")
Real-Time Voice Agent Pipeline
Microphone → VAD → ASR → LLM → TTS → speaker. Sub-500ms end-to-end latency.
from livekit.agents import AutoSubscribe, JobContext
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero
async def entrypoint(ctx: JobContext):
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
assistant = VoiceAssistant(
vad=silero.VAD.load(),
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="nova"),
)
assistant.start(ctx.room)
await assistant.say("How can I help?")
Reinforcement Learning — Q-Learning to GRPO
An agent takes actions in an environment, receives rewards, and learns a policy maximizing cumulative reward. Powers game AI, robotics, and crucially — LLM alignment (RLHF, GRPO).
PPO with stable-baselines3
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
env = gym.make("CartPole-v1")
model = PPO(
"MlpPolicy", env,
learning_rate=3e-4,
n_steps=2048, # rollout length
batch_size=64,
n_epochs=10, # gradient updates per rollout
gamma=0.99, # discount
gae_lambda=0.95, # advantage estimation
clip_range=0.2, # PPO clip ε
verbose=1,
)
eval_cb = EvalCallback(env, best_model_save_path="./best/",
eval_freq=5000)
model.learn(total_timesteps=100_000, callback=eval_cb)
obs, _ = env.reset()
for _ in range(1000):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, _, _ = env.step(action)
if done: obs, _ = env.reset()
GRPO — DeepSeek R1's Alignment Method
Group Relative Policy Optimization. No critic/value model needed — compare outputs within a group, normalize scores. Cheaper and more stable than PPO for LLM reasoning.
from trl import GRPOConfig, GRPOTrainer
from transformers import AutoTokenizer, AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
def reward_fn(completions, **kwargs) -> list[float]:
"""Return a scalar reward per completion."""
return [1.0 if is_correct(c) else 0.0 for c in completions]
trainer = GRPOTrainer(
model=model,
tokenizer=tokenizer,
config=GRPOConfig(
num_generations=8, # group size G — compare these against each other
max_new_tokens=512,
learning_rate=1e-6,
kl_coef=0.01, # KL penalty vs reference policy
output_dir="grpo-model",
),
reward_funcs=[reward_fn],
train_dataset=dataset,
)
trainer.train()
RL Algorithm Cheatsheet — When to Use What
| Algorithm | Type | Use Case | Pros |
|---|---|---|---|
| Q-Learning / DQN | Value-based, off-policy | Discrete actions (Atari) | Sample efficient |
| SAC | Off-policy, entropy-reg | Continuous control (robotics) | Stable, sample efficient |
| PPO | On-policy, clipped | Game AI, RLHF | Reliable, widely supported |
| GRPO | Group comparison, no critic | LLM reasoning (DeepSeek R1) | No value model needed |
| DPO | Offline, direct preference | LLM fine-tuning alignment | No RL training loop at all |
| Constitutional AI | Self-critique + RL | LLM harmlessness (Anthropic) | Scalable without human labels |
Model Context Protocol & Tool Calling
Tool calling lets LLMs invoke external functions. MCP (Model Context Protocol, Anthropic 2024) is an open standard — like USB-C for AI tools. Any MCP server works with any MCP-compatible host (Claude Desktop, Cursor, VS Code).
Claude Tool Use — Agentic Loop
import anthropic, json
client = anthropic.Anthropic()
tools = [{
"name": "get_weather",
"description": "Get current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string"},
"unit": {"type": "string", "enum": ["celsius","fahrenheit"]},
},
"required": ["location"],
},
}]
def get_weather(location, unit="celsius"):
return {"temp": 22, "conditions": "partly cloudy"}
messages = [{"role": "user", "content": "Weather in Tokyo?"}]
while True:
resp = client.messages.create(
model="claude-opus-4-7", tools=tools,
messages=messages, max_tokens=1024,
)
if resp.stop_reason == "end_turn": break
tool_results = []
for block in resp.content:
if block.type == "tool_use":
result = get_weather(**block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": json.dumps(result),
})
messages += [{"role": "assistant", "content": resp.content},
{"role": "user", "content": tool_results}]
print(next(b for b in resp.content if b.type == "text").text)
Build an MCP Server (FastMCP)
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("my-tools")
@mcp.tool()
def search_products(query: str, limit: int = 10) -> list[dict]:
"""Search product catalog by keyword."""
return db.execute(
"SELECT * FROM products WHERE name ILIKE ?",
f"%{query}%", limit=limit
)
@mcp.resource("orders://{order_id}")
def get_order(order_id: str) -> str:
"""Get order details by ID."""
return orders_db.get(order_id)
@mcp.prompt()
def analyze_order(order_id: str) -> str:
return f"Analyze order {order_id} for anomalies"
if __name__ == "__main__":
mcp.run() # stdio by default
# HTTP: mcp.run(transport="streamable-http", port=8000)
{
"mcpServers": {
"my-tools": {
"command": "python",
"args": ["/path/to/server.py"]
}
}
}
Robotics & Embodied AI
Foundation models leaving the cloud and entering the physical world — manipulation, locomotion, perception, and the sim-to-real gap.
The Robotics AI Stack
Modern robotics merges classical control with deep learning. The frontier: robot foundation models — large Vision-Language-Action (VLA) policies trained on diverse robot data that generalize across embodiments.
Robot Foundation Models
| Model | By | Key Innovation |
|---|---|---|
| RT-2 | Google DeepMind | PaLI-X VLM → action tokens. Web knowledge transfers to robots |
| π0 (pi-zero) | Physical Intelligence | VLA + diffusion action head. SOTA dexterous manipulation |
| OpenVLA | Stanford/Berkeley | 7B open-source VLA, fine-tunable on your robot data |
| Helix | Figure AI | Real-time dual-arm VLA on Figure 02 humanoid |
| UniSim | World model for robot simulation and planning |
Humanoid Robot Landscape (2025)
| Robot | Company | Status |
|---|---|---|
| Optimus Gen 2 | Tesla | Production-line testing, Gigafactory |
| Figure 02 | Figure AI | Commercial, BMW partnership |
| NEO Gamma | 1X Technologies | Home assistant, open-data strategy |
| Atlas | Boston Dynamics | Electric, Hyundai integration |
| GR-2 | Fourier Intelligence | Mass production, rehab + logistics |
| Unitree G1 | Unitree | $16K — most affordable humanoid |
ROS2 — Robot Operating System 2
Industry-standard middleware. DDS-based pub/sub, real-time capable, cross-platform. Used in autonomous vehicles, surgical robots, warehouse automation.
ROS2 Core Concepts — Node, Topic, Service
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
class CameraNode(Node):
def __init__(self):
super().__init__("camera_node")
# Publisher: send camera frames at 30fps
self.pub = self.create_publisher(Image, "/camera/rgb", 10)
self.timer = self.create_timer(0.033, self.publish_frame)
# Subscriber: receive navigation commands
self.sub = self.create_subscription(
Twist, "/cmd_vel", self.on_cmd_vel, 10)
def publish_frame(self):
msg = Image()
msg.header.stamp = self.get_clock().now().to_msg()
msg.encoding = "rgb8"
# msg.data = capture_camera_bytes()
self.pub.publish(msg)
def on_cmd_vel(self, msg: Twist):
self.get_logger().info(
f"Moving: linear={msg.linear.x:.2f} "
f"angular={msg.angular.z:.2f}"
)
rclpy.init()
node = CameraNode()
rclpy.spin(node) # event-driven loop
MuJoCo Simulation + Domain Randomization
import gymnasium as gym
import numpy as np
# MuJoCo built-in robots
env = gym.make("HalfCheetah-v4", render_mode="rgb_array")
# Domain randomization — key for sim-to-real transfer
def randomize_physics(env):
"""Randomize friction/mass/actuator noise each episode.
The policy must learn to handle all variations → transfers to real."""
model = env.unwrapped.model
# Randomize friction (±50%)
model.geom_friction[:] = 0.8 * np.random.uniform(0.5, 2.0,
size=model.geom_friction.shape)
# Randomize body mass (±20%)
model.body_mass[1:] *= np.random.uniform(0.8, 1.2,
size=model.body_mass[1:].shape)
# NVIDIA Isaac Lab: 4096 parallel envs on A100
# isaac_env = gym.make("Isaac-Velocity-Flat-Anymal-C-v0")
# Runs 100× faster than MuJoCo, photorealistic
Sim stack in 2025: NVIDIA Isaac Lab for GPU-accelerated RL (4096 parallel envs). MuJoCo for manipulation research. PyBullet for quick prototypes. Genesis (2024) for generative world models.
Robot Learning — Imitation to Diffusion Policy
ACT — Action Chunking Transformer
Stanford's ALOHA system. Predict k future actions as a chunk (not one-at-a-time). Eliminates compounding errors in dexterous manipulation.
"""
ACT key ideas:
- CVAE encoder: encode action sequence → style latent z
- Transformer: obs + z → predict chunk of k=100 actions
- Temporal ensembling: average overlapping chunks
- Trained on teleoperation via ALOHA bimanual hardware
- Input: 4 camera views + joint positions
- Output: 100 joint position targets at 50 Hz = 2-second plan
"""
from lerobot.policies.act import ACTPolicy
# Pre-trained on HuggingFace Hub
policy = ACTPolicy.from_pretrained(
"lerobot/act_aloha_sim_transfer_cube_human"
)
# Inference
obs = {
"observation.images.top": img_tensor, # [C,H,W]
"observation.state": joint_positions_tensor, # [14]
}
action_chunk = policy.select_action(obs) # [100, 14]
Diffusion Policy
Model robot actions as a denoising diffusion process. Handles multimodal action distributions naturally — a robot can pick up from left OR right without mode averaging.
"""
Diffusion Policy (Chi et al., 2023):
- Treat action trajectory like an image — learn to denoise it
- DDPM / DDIM scheduler (16 denoising steps at inference)
- U-Net or Transformer denoises, conditioned on observation
- Naturally multimodal: represents all valid grasp modes
- No mode averaging (behavior cloning averages → bad actions)
"""
from lerobot.policies.diffusion import DiffusionPolicy
policy = DiffusionPolicy.from_pretrained("lerobot/diffusion_pusht")
obs = {
"observation.image": img_tensor,
"observation.state": state_tensor,
}
# Starts from Gaussian noise → denoises → action chunk
action_chunk = policy.select_action(obs) # [16, 2]
Start here: LeRobot (HuggingFace) has 100+ teleoperation datasets, pre-trained ACT/DiffusionPolicy, and one-command training for SO-100 and ALOHA robots.
Quantum Computing
Qubits, superposition, entanglement — and what quantum actually means for AI. From Qiskit circuits to QAOA optimization and the honest timeline for quantum advantage.
Quantum Fundamentals — Qubits, Gates, Circuits
Classical bits are 0 or 1. A qubit exists in superposition: α|0⟩ + β|1⟩ where |α|² + |β|² = 1. Entanglement links qubits non-locally. Interference cancels wrong answer paths and amplifies correct ones — that's the quantum speedup.
Essential Quantum Gates
| Gate | Effect |
|---|---|
| H (Hadamard) | |0⟩→(|0⟩+|1⟩)/√2 — creates superposition |
| X (Pauli-X) | |0⟩↔|1⟩ — quantum NOT gate |
| Z (Pauli-Z) | |1⟩→−|1⟩ — phase flip |
| CNOT | Flip target if control=|1⟩ — creates entanglement |
| T gate | π/8 phase rotation — needed for universal QC |
| Rx/Ry/Rz(θ) | Arbitrary Bloch sphere rotation — parameterized circuits |
Qiskit — Hello Quantum World
from qiskit import QuantumCircuit
from qiskit.primitives import StatevectorSampler
# Build Bell State: (|00⟩ + |11⟩) / √2
qc = QuantumCircuit(2, 2)
qc.h(0) # superposition on q0
qc.cx(0, 1) # CNOT: entangle q0 → q1
qc.measure([0, 1], [0, 1])
# Simulate locally
sampler = StatevectorSampler()
counts = sampler.run([qc], shots=1024).result()[0]\
.data.c.get_counts()
# {'00': ~512, '11': ~512} — never '01' or '10'
print(qc.draw("text"))
# Run on real IBM quantum hardware (free tier available)
from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2
service = QiskitRuntimeService(
channel="ibm_quantum", token="YOUR_IBM_TOKEN"
)
backend = service.least_busy(operational=True, simulator=False)
job = SamplerV2(mode=backend).run([qc], shots=1024)
print(job.result()[0].data.c.get_counts())
Quantum Algorithms — Grover, Shor, VQE, QAOA
Grover's Algorithm — Quadratic Search Speedup
Classical unstructured search: O(N). Grover's: O(√N). For 1M items: 1M → 1,000 steps. Works via amplitude amplification — repeatedly boosting the marked state's probability.
from qiskit.circuit.library import PhaseOracle
from qiskit.algorithms import Grover, AmplificationProblem
from qiskit.primitives import Sampler
# Oracle marks the target state "11" (boolean: x0 AND x1)
oracle = PhaseOracle("x0 & x1")
problem = AmplificationProblem(
oracle,
is_good_state=["11"], # what we're searching for
)
grover = Grover(sampler=Sampler())
result = grover.amplify(problem)
print(result.top_measurement) # "11" with high probability
print(result.max_probability) # close to 1.0
# Optimal iterations ≈ π/4 × √(N/M)
# N = search space size, M = number of solutions
# 2 qubits → N=4, M=1 → 1 iteration optimal
Shor's Algorithm — RSA Threat
Factors N-bit integers in polynomial time O((log N)³). Breaks RSA, DSA, ECDSA. Requires millions of physical qubits (fault-tolerant). ~10-20 years away.
"""
Shor's algorithm outline:
1. Choose random a < N
2. Find period r of f(x) = aˣ mod N
— Uses Quantum Phase Estimation (QPE) + QFT
— This is the O(log³N) quantum speedup step
3. If r even and aʳ/² ≢ -1 (mod N):
gcd(aʳ/²±1, N) gives a factor
Why it breaks RSA:
- RSA-2048 relies on factoring being classically hard
- Classical best: O(exp(n^1/3 log^2/3 n)) — sub-exponential
- Shor's: O(n³) — polynomial, exponential speedup
- Also breaks ECDSA (elliptic curve) and DSA
Post-quantum crypto (safe from Shor's) — NIST 2024:
CRYSTALS-Kyber → key encapsulation ✅ standardized
CRYSTALS-Dilithium → digital signatures ✅ standardized
SPHINCS+ → hash-based signatures ✅ standardized
AES-256 is quantum-safe (Grover halves key strength → 128 bits)
"""
VQE — Quantum Chemistry (Near-Term)
Variational Quantum Eigensolver finds ground-state energy of molecules. Hybrid classical-quantum. First real-world quantum advantage domain: drug discovery, materials science.
from qiskit_nature.second_q.drivers import PySCFDriver
from qiskit_nature.second_q.mappers import JordanWignerMapper
from qiskit_algorithms import VQE
from qiskit_algorithms.optimizers import SLSQP
from qiskit.circuit.library import TwoLocal
from qiskit.primitives import Estimator
# Hydrogen molecule at equilibrium bond length
driver = PySCFDriver(atom="H .0 .0 .0; H .0 .0 0.735",
basis="sto3g")
problem = driver.run()
# Map fermionic Hamiltonian → qubit operators
qubit_op = JordanWignerMapper().map(problem.second_q_ops()[0])
# Parameterized ansatz: trial wave function
ansatz = TwoLocal(rotation_blocks="ry",
entanglement_blocks="cz", reps=2)
# Hybrid loop: quantum circuit → classical optimizer → repeat
result = VQE(Estimator(), ansatz, SLSQP())\
.compute_minimum_eigenvalue(qubit_op)
print(f"H₂ ground state: {result.eigenvalue:.6f} Hartree")
QAOA — Combinatorial Optimization
Quantum Approximate Optimization Algorithm. Targets NP-hard problems (MaxCut, TSP, scheduling). Works on today's NISQ hardware.
from qiskit_optimization.problems import QuadraticProgram
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit_algorithms import QAOA
from qiskit.primitives import Sampler
# MaxCut: partition graph to maximize edges crossing the cut
qp = QuadraticProgram()
qp.binary_var_list(4) # 4 nodes
# Objective: minimize -1*(sum of cut edges)
qp.minimize(quadratic={(0,1):-1, (1,2):-1, (2,3):-1, (0,2):-1})
# QAOA with p=2 layers (more layers → better approximation)
qaoa = QAOA(sampler=Sampler(), reps=2)
result = MinimumEigenOptimizer(qaoa).solve(qp)
print(result.x) # [0,1,0,1] — partition assignment
print(result.fval) # approximation ratio vs classical opt
Quantum Machine Learning — Honest Assessment
Quantum Neural Networks with PennyLane
Parameterized quantum circuits as differentiable layers. Backprop via parameter-shift rule. Integrates with PyTorch or JAX.
import pennylane as qml
import torch
n_qubits = 4
dev = qml.device("default.qubit", wires=n_qubits)
@qml.qnode(dev, interface="torch")
def quantum_circuit(inputs, weights):
# Encode classical data into quantum state
qml.AngleEmbedding(inputs, wires=range(n_qubits))
# Parameterized entangling layers
qml.BasicEntanglerLayers(weights, wires=range(n_qubits))
# Measure: return expectation values
return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]
# Wrap as a PyTorch layer
qlayer = qml.qnn.TorchLayer(
quantum_circuit,
weight_shapes={"weights": (3, n_qubits)}
)
# Hybrid classical → quantum → classical model
model = torch.nn.Sequential(
torch.nn.Linear(8, n_qubits), # classical encoder
qlayer, # quantum processing
torch.nn.Linear(n_qubits, 2), # classical decoder
)
# Train with standard PyTorch optimizer + backprop
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
Quantum ML — Timeline & Honest Take
| Technique | Status | When Matters |
|---|---|---|
| VQE (chemistry) | Near-term real use | <5 yr: drug discovery |
| QAOA (optimization) | NISQ-era, niche | <5 yr: scheduling |
| Quantum kernels/SVM | Research scale | Advantage unproven on real data |
| QNN / PQC | Research | Barren plateau problem unsolved |
| Shor's (RSA break) | Requires fault-tolerant | 10-20 years |
| Grover's (ML speedup) | Theoretical | Needs fault-tolerant QC |
| Hardware | Qubits (2025) | SDK |
|---|---|---|
| IBM Quantum | 1000+ (Heron r2) | Qiskit |
| Google Quantum AI | 105 (Willow) | Cirq |
| Quantinuum | 56 (H2-1) | pytket |
| IonQ | 36 (Forte) | ionq SDK |
| PsiQuantum | ~1M by 2027? | Photonic |
Honest quantum ML take for 2025: Classical ML still beats quantum ML on every real-world benchmark. The "quantum advantage" for ML is unproven. But — VQE for molecular simulation and QAOA for small combinatorial problems show real promise. Learn Qiskit now. When fault-tolerant quantum arrives (~2030-2035), the engineers who understand both ML and quantum algorithms will design the next generation of models. That intersection is tiny and extremely valuable.
🏆 Complete Learning Map — All 7 Parts
| Domain | Foundation | Production | Frontier |
|---|---|---|---|
| RAG | Basic pipeline, chunking, embeddings | HyDE, GraphRAG, RAGAS eval | Agentic RAG, multimodal, self-RAG |
| LLMs | Transformer internals, attention | LoRA fine-tuning, vLLM serving | DPO, GRPO, test-time compute |
| Agents | ReAct, tool calling, MCP | LangGraph multi-agent, HITL | Multi-agent, sub-graphs, CrewAI |
| Computer Vision | CNNs, ViT, CLIP | YOLOv8, SAM, diffusers | Video gen (DiT), multimodal VLMs |
| Speech/Audio | Whisper ASR, TTS basics | Real-time voice agent pipelines | GPT-4o audio, MusicGen |
| RL | MDP, Q-learning, PPO | RLHF pipeline, DPO | GRPO, Constitutional AI |
| Cloud / MLOps | AWS S3/EC2/Lambda | ECS, W&B, MLflow | GPU clusters, feature stores |
| Robotics | ROS2 basics, MuJoCo sim | ACT, Diffusion Policy, LeRobot | VLA models (RT-2, π0), humanoids |
| Quantum | Qubits, gates, Qiskit | VQE, QAOA, Grover's | QNN (PennyLane), fault-tolerant |