📚 2025–2026 Edition · Regularly Updated

Retrieval-Augmented
Generation Mastery

From Basic RAG to GraphRAG, Agentic Systems, and Production Deployments — every technique explained with architecture diagrams and real code examples.

12+
Techniques Covered
25+
Code Examples
6
Vector DB Compared
3
Eval Frameworks

Basic RAG Architecture

RAG grounds an LLM's responses in retrieved, real-world documents — eliminating hallucinations and making knowledge updatable without retraining.

User Query "What is RAG?" Embed Query text-embedding-3 Vector Search Top-K chunks Build Prompt context + query LLM Generate 📦 Vector Store (indexed docs)
Python · OpenAI + ChromaDB
from openai import OpenAI
import chromadb

client = OpenAI()
chroma = chromadb.Client()
collection = chroma.get_or_create_collection("docs")

def embed(text: str) -> list[float]:
    res = client.embeddings.create(model="text-embedding-3-small", input=text)
    return res.data[0].embedding

def index_documents(docs: list[str]):
    """Embed and store documents in the vector store."""
    embeddings = [embed(doc) for doc in docs]
    collection.add(
        documents=docs,
        embeddings=embeddings,
        ids=[f"doc_{i}" for i in range(len(docs))]
    )

def rag_query(question: str, k: int = 3) -> str:
    # 1️⃣ Embed the user query
    q_emb = embed(question)

    # 2️⃣ Retrieve top-k similar chunks
    results = collection.query(query_embeddings=[q_emb], n_results=k)
    context = "\n\n".join(results["documents"][0])

    # 3️⃣ Augment prompt with retrieved context
    prompt = f"""Use ONLY the context below to answer the question.

Context:
{context}

Question: {question}"""

    # 4️⃣ Generate answer
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# Usage
index_documents(["RAG grounds LLMs in real documents...", "Retrieval happens via vector search..."])
answer = rag_query("What is RAG?")
print(answer)
💡

Why RAG beats fine-tuning for most use-cases: RAG lets you update knowledge in seconds (re-index), costs ~$0 per update, and provides citations. Fine-tuning bakes knowledge into weights — expensive, slow, opaque.

Chunking Strategies

How you split documents is as important as the retrieval model. Wrong chunk size kills recall and precision.

✂️

Fixed-Size

Split every N tokens. Simple but breaks sentences mid-thought.

Fast
🔤

Sentence

NLTK / spaCy sentence boundaries. Clean, natural splits.

Balanced
🧠

Semantic

Embed sentences, split on cosine distance jumps. Slow but accurate.

Best Quality
🌳

RAPTOR

Recursive summarization tree. Handles multi-scale questions.

Multi-Level
📑

Document-Aware

Markdown headers, HTML tags, PDF structure as boundaries.

Structure-Safe
🔗

Sliding Window

Overlapping chunks (e.g. 512 tokens, 64 overlap) preserve context.

Context-Safe
📦

Parent-Child

Store small chunks for search, return large parent for generation.

Precision+Context
Python · Semantic Chunking
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

# Semantic chunker splits where embedding similarity drops sharply
splitter = SemanticChunker(
    embeddings=OpenAIEmbeddings(model="text-embedding-3-small"),
    breakpoint_threshold_type="percentile",   # or "standard_deviation"
    breakpoint_threshold_amount=85,            # split at 85th percentile jumps
)

docs = splitter.create_documents([long_text])
print(f"Created {len(docs)} semantically coherent chunks")

# ── Parent-Child pattern ──
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter

parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter  = RecursiveCharacterTextSplitter(chunk_size=400)

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=InMemoryStore(),
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
)
# Search returns small child chunks → fetches big parent for LLM context
StrategyChunk Size Sweet SpotBest ForMain Risk
Fixed-Size512–1024 tokensHomogeneous text, high throughputSplits mid-sentence
SemanticVariableTechnical docs, research papersSlow & costly to compute
Parent-ChildChild 200–400 / Parent 1500–2000Balancing precision + contextStorage overhead
RAPTORMulti-level summariesLong-form, multi-section docsComplexity, latency
Sliding Window512 + 64 overlapDense factual text (legal, medical)Duplicate info in retrieval

Embedding Models

Embeddings are the heart of retrieval quality. Different models excel at different domains and languages.

🏆 OpenAI text-embedding-3-large

Best all-around for English. 3072 dims, supports Matryoshka truncation. ~$0.13/M tokens.

MTEB Score
64.6
Speed
Fast

🚀 Cohere embed-v3

Multilingual (100+ langs), 1024 dims, supports int8 quantization. Input-type aware (query vs doc).

MTEB Score
64.0
Multilingual
★★★★★

🔓 nomic-embed-text-v1.5 (Open)

8192 token context (vs 512 for most), Apache 2.0, runs locally. Excellent for long docs.

Context Length
8192
Cost
$0

⚡ ColBERT / ColPali

Late-interaction: embed every token, not just [CLS]. MaxSim retrieval — dramatically better recall.

Recall@10
98%
Storage
High
Python · Matryoshka Representation Learning (MRL)
# MRL: truncate embedding dimensions without retraining
# text-embedding-3-* supports this natively

from openai import OpenAI

client = OpenAI()

def embed_mrl(text: str, dims: int = 256) -> list[float]:
    """
    Smaller dims = cheaper storage + faster search, small accuracy drop.
    dims=256  → 6× smaller, ~2% accuracy loss
    dims=1536 → balanced (default small model)
    dims=3072 → maximum quality (large model)
    """
    res = client.embeddings.create(
        model="text-embedding-3-large",
        input=text,
        dimensions=dims   # ← MRL truncation
    )
    return res.data[0].embedding

# Hybrid retrieval: combine dense + sparse (BM25)
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

bm25 = BM25Retriever.from_documents(docs)
bm25.k = 4

dense = vectorstore.as_retriever(search_kwargs={"k": 4})

# 60% dense, 40% BM25 — best of both worlds
hybrid = EnsembleRetriever(
    retrievers=[bm25, dense],
    weights=[0.4, 0.6]
)

Vector Databases

Choosing the right vector store depends on scale, latency requirements, filtering needs, and whether you need managed hosting.

DatabaseBest ForFilteringScaleManagedOpen Source
Qdrant Production RAG, Rust-based speed ✅ Payload filters 1B+ vectors ✓ Cloud ✓ Apache 2.0
Pinecone Serverless, zero-ops ✅ Metadata Unlimited ✓ Fully managed
pgvector Existing PostgreSQL users ✅ Full SQL ~10M vectors ✓ Supabase/RDS ✓ PostgreSQL ext
Weaviate Multi-modal, hybrid search built-in ✅ GraphQL 100M+ vectors ✓ Cloud ✓ BSD 3
Chroma Local dev, prototyping ✅ Where filters ~1M vectors ✓ Apache 2.0
Milvus Billion-scale enterprise ✅ Scalar + vector 10B+ vectors ✓ Zilliz Cloud ✓ Apache 2.0
Python · Qdrant with Payload Filtering
from qdrant_client import QdrantClient, models

client = QdrantClient(url="http://localhost:6333")

# Create collection with HNSW index
client.create_collection(
    collection_name="docs",
    vectors_config=models.VectorParams(size=1536, distance=models.Distance.COSINE),
    hnsw_config=models.HnswConfigDiff(m=16, ef_construct=100)  # tune for recall/speed
)

# Upsert with metadata payload
client.upsert(
    collection_name="docs",
    points=[
        models.PointStruct(
            id=1,
            vector=embed("Climate change impacts 2024"),
            payload={"source": "ipcc.pdf", "year": 2024, "category": "climate"}
        )
    ]
)

# Filtered semantic search — only docs from 2024+, climate category
results = client.search(
    collection_name="docs",
    query_vector=embed("global warming effects"),
    query_filter=models.Filter(
        must=[
            models.FieldCondition(key="year", range=models.Range(gte=2024)),
            models.FieldCondition(key="category", match=models.MatchValue(value="climate"))
        ]
    ),
    limit=5
)

Advanced RAG

Each technique solves a specific failure mode in Naive RAG. Understand the problem first, then apply the fix.

🔮

HyDE — Hypothetical Document Embeddings Medium

Problem: User queries are short & ambiguous. Documents are long & specific. Their embeddings live in different vector spaces.

User Query "How do vaccines work?" LLM Generates Hypothetical Answer "Vaccines train immune..." Embed Hypothesis → doc-like vector space Retrieve Real Docs (better match!) LLM
Python · HyDE
from openai import OpenAI
client = OpenAI()

def hyde_retrieve(query: str, vectorstore, k: int = 5):
    # Step 1: Generate a hypothetical document
    hyp_doc = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "system",
            "content": "Write a factual paragraph that would directly answer this question."
        }, {
            "role": "user",
            "content": query
        }]
    ).choices[0].message.content

    # Step 2: Embed the hypothesis (lives in doc-space, not query-space)
    hyp_embedding = embed(hyp_doc)

    # Step 3: Search with the hypothesis embedding
    results = vectorstore.similarity_search_by_vector(hyp_embedding, k=k)
    return results

# HyDE improves recall by ~15-20% on knowledge-intensive tasks
🔀

RAG-Fusion — Multi-Query + RRF Medium

Problem: A single query misses relevant docs phrased differently. Solution: generate N query variations and fuse their ranked results.

Original Query 1 question Query variant 1 Query variant 2 Query variant 3 Vector Search 3× independent RRF Fusion Reciprocal Rank re-ranking Generate Best context
Python · RAG-Fusion with RRF
from langchain.load import dumps, loads

def generate_query_variants(query: str, n: int = 4) -> list[str]:
    prompt = f"""Generate {n} different ways to ask this question.
Output only the questions, one per line.
Original: {query}"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content.strip().split("\n")

def reciprocal_rank_fusion(results: list[list], k: int = 60) -> list:
    """RRF score = Σ 1/(k + rank). Promotes docs appearing high across many queries."""
    fused_scores: dict = {}
    for docs in results:
        for rank, doc in enumerate(docs):
            doc_str = dumps(doc)
            fused_scores[doc_str] = fused_scores.get(doc_str, 0) + 1 / (k + rank + 1)
    return [
        loads(doc) for doc, _ in
        sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

def rag_fusion_query(query: str, vectorstore, k: int = 5):
    variants = generate_query_variants(query)
    all_results = [vectorstore.similarity_search(q, k=k) for q in variants]
    return reciprocal_rank_fusion(all_results)[:k]
🤔

Self-RAG — Adaptive Retrieval with Reflection Hard

The LLM decides when to retrieve, critiques its own output, and generates special reflection tokens to self-assess relevance and support.

📝 Query
🔍 Retrieve?
[Retrieve] token
📄 Docs
✅ Relevant?
[ISREL] token
📝 Generate
🎯 Supported?
[ISSUP] token
⭐ Useful?
[ISUSE] token
Python · Self-RAG pattern (simplified)
# Full Self-RAG requires a fine-tuned model (selfrag/selfrag_llama2_7b on HuggingFace)
# This shows the conceptual pattern using prompting

SELF_RAG_PROMPT = """You are a Self-RAG assistant. For each question:
1. Decide if retrieval is needed → output [Retrieve] or [No Retrieve]
2. If retrieved docs are relevant → output [ISREL: yes/no]
3. Generate answer grounded in docs → output [ISSUP: fully/partially/no]
4. Rate your answer → output [ISUSE: 5/4/3/2/1]"""

def self_rag(query: str, vectorstore) -> dict:
    # First pass: decide if retrieval is needed
    decision = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SELF_RAG_PROMPT},
            {"role": "user", "content": f"Question: {query}\nShould I retrieve? Output [Retrieve] or [No Retrieve]."}
        ]
    ).choices[0].message.content

    if "[Retrieve]" in decision:
        docs = vectorstore.similarity_search(query, k=3)
        context = "\n".join(d.page_content for d in docs)
    else:
        context = ""  # Answer from parametric knowledge

    # Generate with self-critique tokens
    answer = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Context: {context}\nQ: {query}\nAnswer + critique tokens:"}]
    ).choices[0].message.content

    return {"answer": answer, "retrieved": bool(context), "context": context}
🔄

CRAG — Corrective RAG Medium

When retrieved docs score low relevance, CRAG automatically falls back to web search and re-ranks before generating.

📝 Query
🔍 Retrieve
⚖️ Score Relevance
Score > 0.7?
✅→
📝 Generate
Score < 0.7 → 🌐 Web Search
🔀 Re-rank + Strip
📝 Generate
Python · CRAG with LangGraph
from langgraph.graph import StateGraph, END
from langchain_community.tools.tavily_search import TavilySearchResults

web_search = TavilySearchResults(max_results=3)

def grade_documents(state):
    """Score retrieval relevance; flag for web search if low."""
    docs, question = state["documents"], state["question"]
    filtered, web_needed = [], False
    for doc in docs:
        grade = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content":
                f"Is this document relevant to '{question}'?\nDoc: {doc.page_content[:500]}\nAnswer yes/no"}]
        ).choices[0].message.content.lower()
        if "yes" in grade:
            filtered.append(doc)
        else:
            web_needed = True
    return {"documents": filtered, "web_search": web_needed, "question": question}

def web_search_node(state):
    results = web_search.invoke(state["question"])
    new_docs = [Document(page_content=r["content"]) for r in results]
    return {"documents": state["documents"] + new_docs}

# Build CRAG graph
workflow = StateGraph(dict)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("web_search", web_search_node)
workflow.add_node("generate", generate_node)
workflow.add_conditional_edges("grade_documents", lambda s: "web_search" if s["web_search"] else "generate")

GraphRAG & Knowledge Graph RAG

Standard RAG retrieves isolated text chunks. GraphRAG builds a knowledge graph first, enabling multi-hop reasoning across connected entities.

Claude AI Model Anthropic Company RAG Technique Dario Person LangChain Framework made_by uses founded_by implemented_in collaborates LEGEND Entity Node Relationship

🏢 Microsoft GraphRAG

Clusters entities using the Leiden algorithm, generates community summaries at each level, enables global & local search modes.

Leiden Clustering Community Summaries Global Search Local Search

🦙 LlamaIndex KG Index

Auto-extracts (subject, predicate, object) triples using an LLM, stores in NetworkX/Neo4j, retrieves via keyword or embedding search on graph.

SPO Triples Neo4j NetworkX Multi-hop
Python · LlamaIndex Knowledge Graph
from llama_index.core import KnowledgeGraphIndex, SimpleDirectoryReader
from llama_index.core.graph_stores import SimpleGraphStore
from llama_index.llms.openai import OpenAI as LlamaOpenAI

# Load documents
documents = SimpleDirectoryReader("./data").load_data()

# Build KG Index — LLM extracts SPO triples automatically
kg_index = KnowledgeGraphIndex.from_documents(
    documents,
    llm=LlamaOpenAI(model="gpt-4o"),
    max_triplets_per_chunk=10,
    include_embeddings=True,   # hybrid: graph + vector
    graph_store=SimpleGraphStore(),
)

# Query with graph traversal
query_engine = kg_index.as_query_engine(
    include_text=True,
    response_mode="tree_summarize",
    embedding_mode="hybrid",   # keyword + vector on graph
    similarity_top_k=5,
)

# Multi-hop: "What did the CEO of the company that built Claude found before Anthropic?"
response = query_engine.query("What companies did Dario Amodei found?")
print(response)

# Visualize the graph
kg_index.get_networkx_graph()  # → export to Gephi / pyvis
🌟

When to use GraphRAG: Multi-hop questions ("Who is the CEO of the company that built X?"), relationship queries, large document sets with cross-document dependencies (e.g. medical literature, legal case networks).

Agentic RAG

Agentic RAG combines retrieval with autonomous tool use, planning, and multi-step reasoning. The LLM acts as an agent that decides what to retrieve and when.

🤖 Agent (LLM + ReAct) 🔍 Vector Search 🌐 Web Search 🐍 Code Exec 📊 SQL Query 📧 Email/API 🧮 Calculator
Python · ReAct Agentic RAG with LangGraph
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def search_docs(query: str) -> str:
    """Search internal knowledge base for relevant documents."""
    results = vectorstore.similarity_search(query, k=3)
    return "\n\n".join(r.page_content for r in results)

@tool
def web_search(query: str) -> str:
    """Search the web for current information not in the knowledge base."""
    results = tavily.invoke(query)
    return str(results[:2])

@tool
def execute_python(code: str) -> str:
    """Execute Python code for calculations and data analysis."""
    import subprocess
    result = subprocess.run(["python3", "-c", code], capture_output=True, text=True, timeout=10)
    return result.stdout or result.stderr

# Build ReAct agent — it autonomously decides which tools to call and when
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_react_agent(
    model=llm,
    tools=[search_docs, web_search, execute_python],
    prompt="You are a research assistant. Use tools to answer questions accurately."
)

# The agent will:
# 1. Think: "I need to search docs first"
# 2. Act: call search_docs
# 3. Observe: get results
# 4. Think: "I need more recent data"
# 5. Act: call web_search
# 6. Observe: get results
# 7. Generate: synthesize final answer

result = agent.invoke({"messages": [("user", "What is the latest RAG benchmark score for GPT-4o?")]})
⚠️

Agentic RAG risks: Infinite loops (cap max iterations at 10), cost blowup (each tool call = tokens), prompt injection via retrieved docs. Always sandbox code execution and validate tool outputs.

Multimodal RAG

Retrieve and reason over images, charts, PDFs, audio, and video — not just text.

🖼️ ColPali (2024)

Embeds entire PDF page screenshots with PaliGemma. No OCR needed. Best for charts, diagrams, scanned docs. Top MTEB visual score.

Page-level No OCR PaliGemma

🤖 Vision LLM RAG

Describe images/charts with GPT-4o Vision, store descriptions as text chunks, retrieve and feed original image to LLM for generation.

GPT-4o Vision Hybrid Store Claude 3.7

🎧 Audio RAG

Whisper transcription → chunk → embed → retrieve. Add speaker diarization (pyannote) for meeting/podcast Q&A.

Whisper Diarization Timestamps
Python · Multi-modal RAG (image + text)
import base64
from pathlib import Path

def describe_image(image_path: str) -> str:
    """Use GPT-4o Vision to generate a rich text description of an image."""
    img_data = base64.b64encode(Path(image_path).read_bytes()).decode()
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": [
                {"type": "text", "text": "Describe this image in detail, including all text, data, charts, and visual elements."},
                {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_data}"}}
            ]
        }]
    )
    return response.choices[0].message.content

# Index: store both description (for retrieval) and image path (for generation)
def index_images(image_paths: list[str]):
    for path in image_paths:
        description = describe_image(path)
        vectorstore.add_texts(
            texts=[description],
            metadatas=[{"image_path": path, "type": "image"}]
        )

# Retrieve: get description, return original image to the LLM
def multimodal_rag(query: str) -> str:
    results = vectorstore.similarity_search(query, k=2, filter={"type": "image"})
    images_b64 = []
    for doc in results:
        img_data = base64.b64encode(Path(doc.metadata["image_path"]).read_bytes()).decode()
        images_b64.append(img_data)

    messages = [{"role": "user", "content": [
        {"type": "text", "text": f"Answer using these images: {query}"},
        *[{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img}"}} for img in images_b64]
    ]}]
    return client.chat.completions.create(model="gpt-4o", messages=messages).choices[0].message.content

RAG Frameworks

Don't reinvent the wheel. Pick a framework based on your use-case, then customize.

FrameworkBest ForLearning CurveProduction ReadyUnique Feature
LlamaIndex Data-intensive RAG, structured data, agents Medium ✓ Yes Property Graph Index, 100+ data loaders
LangChain Chains, agents, broad ecosystem Medium ✓ Yes LCEL, LangSmith tracing, 600+ integrations
Haystack Production NLP pipelines, search Low ✓ Yes Pipeline YAML config, Haystack Hub
DSPy Optimising prompts & RAG pipelines High ✓ Yes Automatic prompt optimization (MIPRO, BootstrapFewShot)
Ragas Evaluating RAG quality (not building) Low ✓ Yes Automated faithfulness/relevancy/context metrics
Python · LlamaIndex Advanced Pipeline
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.postprocessor import SentenceTransformerRerank
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.qdrant import QdrantVectorStore

# Ingestion pipeline: transform docs → nodes → embeddings → store
pipeline = IngestionPipeline(
    transformations=[
        SemanticSplitterNodeParser(
            embed_model=OpenAIEmbedding(model="text-embedding-3-small"),
            breakpoint_percentile_threshold=95
        ),
        OpenAIEmbedding(model="text-embedding-3-small"),  # embed nodes
    ],
    vector_store=QdrantVectorStore(client=qdrant_client, collection_name="docs")
)

nodes = pipeline.run(documents=SimpleDirectoryReader("./docs").load_data())

# Query with reranking (cross-encoder re-scores top-20 → return top-5)
index = VectorStoreIndex(nodes, storage_context=storage_ctx)
reranker = SentenceTransformerRerank(model="cross-encoder/ms-marco-MiniLM-L-6-v2", top_n=5)

query_engine = index.as_query_engine(
    similarity_top_k=20,          # retrieve 20
    node_postprocessors=[reranker], # rerank → 5
    response_mode="compact"
)

response = query_engine.query("Explain RAG-Fusion")
print(response.source_nodes[0].score)  # rerank score

Evaluation Frameworks

You can't improve what you don't measure. These four metrics cover the full RAG quality surface.

📐 Faithfulness

Is every claim in the answer supported by the retrieved context? Catches hallucinations introduced by the generator.

0.92 ✓

🎯 Answer Relevance

Does the answer actually address what was asked? Penalises verbose or off-topic responses.

0.78 ~

🔍 Context Precision

Of everything retrieved, how much was actually needed? High noise = low precision.

0.85 ✓

📦 Context Recall

Did the retrieved context contain all the information needed to answer correctly?

0.70 ⚠
Python · RAGAS Evaluation
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall
from datasets import Dataset

# Build evaluation dataset
eval_data = Dataset.from_dict({
    "question":  ["What is RAG?", "Who invented the Transformer?"],
    "answer":    ["RAG augments LLMs with external retrieval...", "The Transformer was introduced by Vaswani et al..."],
    "contexts":  [
        ["RAG stands for Retrieval-Augmented Generation..."],
        ["Attention Is All You Need, Vaswani et al., 2017..."]
    ],
    "ground_truth": ["RAG uses retrieval to augment generation.", "Vaswani et al. invented the Transformer in 2017."]
})

results = evaluate(
    eval_data,
    metrics=[faithfulness, answer_relevancy, context_precision, context_recall]
)

print(results.to_pandas()[["faithfulness","answer_relevancy","context_precision","context_recall"]])
# faithfulness  answer_relevancy  context_precision  context_recall
#         0.92              0.87               0.91            0.78
Python · DeepEval (Unit Test Style)
from deepeval import assert_test
from deepeval.metrics import FaithfulnessMetric, HallucinationMetric, AnswerRelevancyMetric
from deepeval.test_case import LLMTestCase

# DeepEval integrates with pytest — RAG quality as CI/CD gates
def test_rag_faithfulness():
    test_case = LLMTestCase(
        input="What is CRAG?",
        actual_output="CRAG uses a relevance grader to decide when to use web search as fallback.",
        retrieval_context=["CRAG corrects retrieval by scoring doc relevance..."],
    )

    faithfulness = FaithfulnessMetric(threshold=0.8, model="gpt-4o", include_reason=True)
    hallucination = HallucinationMetric(threshold=0.2)

    assert_test(test_case, [faithfulness, hallucination])
    # Fails CI if faithfulness < 0.8 or hallucination > 0.2

Production Considerations

⚡ Latency Optimization

  • Embedding cache — Redis TTL on frequent queries (50–80% cache hit typical)
  • HNSW tuning — ef=64 balances recall (98%) vs latency (<5ms)
  • Async retrieval — asyncio.gather for parallel chunk fetches
  • Streaming — stream LLM tokens, don't wait for full response
  • Quantized embeddings — int8 Cohere v3 = 4× smaller, <1% quality loss

💰 Cost Control

  • MRL truncation — 256 dims = 12× cheaper than 3072, ~2% accuracy loss
  • Small embed model — nomic-embed (free, local) for non-critical paths
  • Context window discipline — top-3 chunks, not top-20
  • Generator routing — GPT-4o-mini for simple Qs, GPT-4o for complex
  • Batch indexing — $0.00002/1K tokens vs $0.00013 at inference

🔐 Security

  • Prompt injection — retrieved docs can inject instructions; use guardrails
  • Access control — filter vectorstore by user permissions before retrieval
  • PII in docs — scan chunks before indexing (Presidio, AWS Comprehend)
  • Source attribution — always return source URLs/page numbers for auditing

📊 Observability

  • LangSmith — trace every retrieval+generation call end-to-end
  • Arize Phoenix — open-source LLM observability, embedding drift
  • Key metrics — latency P50/P95/P99, token cost/query, retrieval hit rate
  • Embedding drift — alert when query dist drifts from index dist
Python · Production RAG with caching + streaming + tracing
import redis, hashlib, asyncio
from openai import AsyncOpenAI

aclient = AsyncOpenAI()
cache = redis.Redis(host="localhost", port=6379)

def cache_key(text: str) -> str:
    return f"emb:{hashlib.sha256(text.encode()).hexdigest()}"

async def embed_cached(text: str) -> list[float]:
    """Embedding with Redis cache — avoids re-embedding identical queries."""
    key = cache_key(text)
    if cached := cache.get(key):
        return eval(cached)  # deserialize
    emb = (await aclient.embeddings.create(model="text-embedding-3-small", input=text)).data[0].embedding
    cache.setex(key, 3600, str(emb))  # 1hr TTL
    return emb

async def rag_stream(question: str, vectorstore):
    """Full async RAG with streaming generator output."""
    q_emb = await embed_cached(question)
    docs = await asyncio.to_thread(vectorstore.similarity_search_by_vector, q_emb, k=3)
    context = "\n\n".join(d.page_content for d in docs)

    prompt = f"Context:\n{context}\n\nQuestion: {question}\nAnswer:"
    stream = await aclient.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True   # ← stream tokens as they arrive
    )
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content  # stream to client

Claude for RAG Applications

Claude models are a top choice for the generation step in RAG, especially for tasks requiring careful reasoning, long-context handling, and instruction following.

ModelReleasedContextRAG Sweet SpotCost (input/1M)
Claude 1.xMar 20239KHistorical baselineN/A (retired)
Claude 2.1Nov 2023200KLong-doc RAG$8
Claude 3 HaikuMar 2024200KHigh-throughput, cost-sensitive$0.25
Claude 3 SonnetMar 2024200KBalanced RAG workloads$3
Claude 3 OpusMar 2024200KComplex multi-hop reasoning$15
Claude 3.5 SonnetJun 2024200KBest price/performance for RAG$3
Claude 3.5 HaikuNov 2024200KFast, cheap agentic RAG$0.80
Claude 3.7 SonnetFeb 2025200KExtended thinking, multi-step RAG$3
Claude 4 Series2025200K+Most capable generation stepVaries
Python · Claude RAG with Extended Thinking
import anthropic

client = anthropic.Anthropic()

def claude_rag(question: str, context_docs: list[str]) -> str:
    """
    Claude 3.7 Sonnet with extended thinking — ideal for complex
    multi-step RAG where reasoning quality matters most.
    """
    context = "\n\n---\n\n".join(context_docs)

    response = client.messages.create(
        model="claude-3-7-sonnet-20250219",
        max_tokens=8000,
        thinking={"type": "enabled", "budget_tokens": 5000},  # extended thinking
        messages=[{
            "role": "user",
            "content": f"""You are a research assistant. Use ONLY the provided context.


{context}


{question}

Answer with citations. If the context is insufficient, say so."""
        }]
    )

    # Extract thinking + answer separately
    thinking_text = next((b.thinking for b in response.content if b.type == "thinking"), "")
    answer = next(b.text for b in response.content if b.type == "text")
    return answer

# Also works with prompt caching — reduces costs up to 90% on repeated context
response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": [{
            "type": "text",
            "text": large_context,
            "cache_control": {"type": "ephemeral"}  # cache the context prefix
        }, {
            "type": "text",
            "text": question
        }]
    }]
)
🔍

On "unreleased" Anthropic models: Anthropic does not publicly disclose models that are in development or trained but not released. Any specific claims about named unreleased models circulating online are speculation, not official announcements. The safest source is anthropic.com/news and the official API model list.

Which RAG Technique Should I Use?

🎯

Query is vague / short

Use HyDE — generate a hypothetical answer and use that embedding for retrieval.

🔀

Missing relevant docs

Use RAG-Fusion — multiple query variants catch documents phrased differently.

Fresh info needed

Use CRAG — auto-fallback to web search when internal docs score low relevance.

🌐

Multi-hop questions

Use GraphRAG — traverse entity relationships across documents.

🤖

Complex, multi-step tasks

Use Agentic RAG — LLM plans and uses tools autonomously.

🖼️

Charts / PDFs / images

Use ColPali or Vision RAG — page-level visual embeddings or describe-then-retrieve.

Part II

Amazon Web Services

Complete practical guide — every major service, when to use it, real code, architecture patterns, and cost tips.

200+
AWS Services
33%
Cloud market share
12
Services deep-dived
20+
Code examples

The AWS Mental Model

AWS is a collection of building blocks. The key is knowing which block solves which problem — don't memorise all 200+ services, learn the 15 that cover 95% of real workloads.

🌐 Internet CloudFront CDN / WAF Route 53 DNS VPC (Virtual Private Cloud) ALB Load Balancer ECS / EC2 Compute Lambda Serverless RDS / Aurora SQL Database DynamoDB NoSQL ElastiCache Redis / Cache S3 SQS / SNS Secrets Manager
CategoryServiceOne-linerWhen NOT to use
ComputeEC2Virtual machine — full OS controlShort-lived tasks (<15 min) → use Lambda
ComputeLambdaFunction-as-a-service, event-drivenLong-running processes (>15 min) → use ECS
ContainersECS / EKSRun Docker containers at scaleSimple apps — over-engineering
StorageS3Unlimited object storageFrequent random reads/writes → use EFS/EBS
DatabaseRDS / AuroraManaged PostgreSQL / MySQLMassive scale >100k writes/s → DynamoDB
DatabaseDynamoDBManaged NoSQL, millisecond latencyComplex joins / ACID transactions → RDS
CacheElastiCacheManaged Redis / MemcachedPersistent data → it's a cache, not a DB
NetworkingCloudFrontCDN + WAF + edge cachingInternal-only APIs with no public traffic
MessagingSQSDurable message queue, decouples servicesReal-time fanout → use SNS or EventBridge
SecurityIAMIdentity, roles, permissions for everythingNever skip — always use least-privilege
SecuritySecrets ManagerStore API keys, DB passwords securelyPublic config values → use SSM Parameter Store
ObservabilityCloudWatchLogs, metrics, alarms, dashboardsComplex APM needs → pair with X-Ray or Datadog

S3 — Simple Storage Service

Virtually unlimited object storage. Durability: 99.999999999% (11 nines). The backbone of almost every AWS architecture.

🗂️ Storage Classes

  • Standard — hot data, frequent access, $0.023/GB
  • Standard-IA — infrequent access, 40% cheaper
  • Intelligent-Tiering — auto-moves between tiers
  • Glacier — archival, retrieval in mins/hours, $0.004/GB
  • Glacier Deep Archive — $0.00099/GB, 12hr retrieval

🔒 Access Control

  • Bucket Policy — JSON, controls access at bucket level
  • IAM Policy — controls which identities can access
  • Pre-signed URLs — temporary access without credentials
  • Block Public Access — always enable on account level
  • S3 Object Lock — WORM compliance (financial, medical)

⚡ Power Features

  • S3 Select — query CSV/JSON inside objects (no download)
  • Event Notifications — trigger Lambda on upload
  • Multipart Upload — required for objects >100MB
  • Transfer Acceleration — CloudFront edge → 50% faster uploads
  • Replication (CRR/SRR) — cross-region / same-region
Python · boto3 S3 — upload, pre-signed URL, lifecycle
import boto3
from botocore.exceptions import ClientError

s3 = boto3.client("s3", region_name="us-east-1")

# ── Upload a file ──
s3.upload_file(
    Filename="report.pdf",
    Bucket="my-bucket",
    Key="reports/2025/report.pdf",
    ExtraArgs={
        "ContentType": "application/pdf",
        "ServerSideEncryption": "AES256",   # always encrypt at rest
        "StorageClass": "STANDARD_IA",       # cheaper for infrequent reads
    }
)

# ── Generate pre-signed URL (expires in 1 hour) ──
url = s3.generate_presigned_url(
    "get_object",
    Params={"Bucket": "my-bucket", "Key": "reports/2025/report.pdf"},
    ExpiresIn=3600
)
print(url)  # share with client — no AWS credentials needed

# ── Multipart upload for large files ──
from boto3.s3.transfer import TransferConfig
config = TransferConfig(multipart_threshold=25 * 1024 * 1024)  # 25MB threshold
s3.upload_file("bigfile.zip", "my-bucket", "uploads/bigfile.zip", Config=config)

# ── Set lifecycle rule — move to Glacier after 90 days, delete after 365 ──
s3.put_bucket_lifecycle_configuration(
    Bucket="my-bucket",
    LifecycleConfiguration={"Rules": [{
        "ID": "archive-old-reports",
        "Status": "Enabled",
        "Filter": {"Prefix": "reports/"},
        "Transitions": [{"Days": 90, "StorageClass": "GLACIER"}],
        "Expiration": {"Days": 365}
    }]}
)

# ── Trigger Lambda on new object ──
# (done in S3 console → Event notifications → Lambda function)
# Lambda handler receives: event["Records"][0]["s3"]["bucket"]["name"] + key
💰

S3 Cost Tips: Enable S3 Intelligent-Tiering for any bucket where access patterns are unknown — it costs $0.0025/1K objects/month but can save 40-90% on storage. Always enable Block Public Access. Use server-side encryption (free). Request counts cost money — batch small operations.

EC2 — Elastic Compute Cloud

Virtual machines in the cloud. You choose the OS, CPU, RAM, storage. The most flexible compute option — but also the most to manage.

FamilyTypeUse CaseExamplevCPU / RAM
Generalt4g, m7gWeb servers, small DBs, dev/stagingt3.micro2 / 1 GB
Computec7g, c6iHigh-CPU: encoding, ML inference, HPCc6i.2xlarge8 / 16 GB
Memoryr7g, x2gdIn-memory DBs, large caches, SAPr6g.2xlarge8 / 64 GB
GPUp4d, g5ML training, video rendering, CUDAg5.xlarge4 / 16 GB + A10G
Storagei4i, d3High I/O, data warehousing, Hadoopi4i.xlarge4 / 32 GB + NVMe

💰 Pricing Models

  • On-Demand — pay per second, no commitment. Most expensive.
  • Reserved (1-3yr) — up to 72% off. Predictable workloads.
  • Spot — up to 90% off. Can be interrupted. Batch/ML workloads.
  • Savings Plans — flexible, 66% off. Cross instance family.

💾 Storage (EBS)

  • gp3 — default, 3000 IOPS baseline. $0.08/GB/month.
  • io2 Block Express — up to 256K IOPS. Databases.
  • st1 — throughput-optimised HDD. Log processing.
  • sc1 — cold HDD. Cheapest, low access frequency.

🛡️ Auto Scaling

  • Launch Template — defines the AMI, instance type, SG, IAM role
  • ASG — min/max/desired count, scaling policies
  • Target Tracking — keep CPU at 70%, auto-adds instances
  • Warm Pools — pre-initialized instances, sub-30s scale-out
Bash · Launch EC2 + configure with user-data script
# Launch an EC2 instance (al2023, t3.micro, us-east-1)
aws ec2 run-instances \
  --image-id ami-0c02fb55956c7d316 \
  --instance-type t3.micro \
  --key-name my-keypair \
  --security-group-ids sg-xxxxxxxxxx \
  --subnet-id subnet-xxxxxxxxxx \
  --iam-instance-profile Name=MyAppRole \
  --user-data '#!/bin/bash
    yum update -y
    yum install -y docker
    systemctl start docker
    systemctl enable docker
    docker pull my-app:latest
    docker run -d -p 80:8000 my-app:latest' \
  --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=my-app}]' \
  --count 1

# Connect via SSM Session Manager (no SSH key needed — more secure)
aws ssm start-session --target i-xxxxxxxxxxxxxxxxx

# Create a snapshot of EBS volume
aws ec2 create-snapshot \
  --volume-id vol-xxxxxxxxxx \
  --description "Before deployment snapshot $(date +%Y-%m-%d)"

# Allocate and associate Elastic IP
EIP=$(aws ec2 allocate-address --domain vpc --query AllocationId --output text)
aws ec2 associate-address --instance-id i-xxxxxxxxxx --allocation-id $EIP

Lambda — Serverless Functions

Run code without managing servers. You pay only for the milliseconds your function actually runs. Scales to zero, scales to millions.

API Gateway S3 Event SQS / SNS λ Lambda Python / Node / Go / Java DynamoDB S3 Write SES / SNS Lambda Limits (know these!) ⏱ Max timeout: 15 minutes 💾 Max memory: 10 GB 📦 Deployment package: 50 MB (250 MB unzipped) 🔁 Concurrency: 1000 default (can increase) 💰 First 1M requests/month FREE forever
Python · Lambda handler patterns
import json, boto3, os

# ── Pattern 1: API Gateway trigger (REST API) ──
def handler_api(event, context):
    body = json.loads(event.get("body", "{}"))
    name = body.get("name", "World")
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"message": f"Hello, {name}!"})
    }

# ── Pattern 2: S3 trigger (process uploaded file) ──
def handler_s3(event, context):
    s3 = boto3.client("s3")
    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key    = record["s3"]["object"]["key"]
        obj    = s3.get_object(Bucket=bucket, Key=key)
        content = obj["Body"].read().decode("utf-8")
        print(f"Processing {key}: {len(content)} bytes")
        # ... process and write result back to S3

# ── Pattern 3: SQS trigger (process messages) ──
def handler_sqs(event, context):
    for record in event["Records"]:
        message = json.loads(record["body"])
        process_job(message)
    # SQS automatically deletes messages on success

# ── Powertools for best practices ──
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit

logger  = Logger(service="my-service")
tracer  = Tracer()
metrics = Metrics(namespace="MyApp")

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@metrics.log_metrics
def handler(event, context):
    metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
    logger.info("Processing request", extra={"event": event})
    # structured JSON logs + X-Ray traces + CloudWatch metrics — all wired up
🥶

Cold starts: First invocation of a Lambda container takes 200ms–2s. Fix with Provisioned Concurrency (keeps containers warm, costs ~$0.015/hr per unit) or use Lambda SnapStart for JVM. For Python/Node cold starts are <200ms — usually acceptable.

Databases — RDS, Aurora, DynamoDB

🐘 RDS / Aurora PostgreSQL

Managed relational DB. Aurora is 5× faster than standard RDS and automatically replicates across 3 AZs with 6 copies of data.

ACID Auto-backups Read Replicas Multi-AZ failover Aurora Serverless v2
💡

Aurora Serverless v2 scales from 0.5 to 128 ACUs in seconds — perfect for variable workloads. Minimum cost: ~$43/month.

⚡ DynamoDB

Key-value + document NoSQL. Single-digit millisecond at any scale. No schema to manage. Global tables for multi-region active-active.

On-demand pricing Global Tables DynamoDB Streams DAX (microsecond) TTL (auto-expire)
⚠️

Design access patterns first — DynamoDB's single-table design requires knowing queries upfront. Wrong key design = full table scans = $$$.

Python · RDS (psycopg2) + DynamoDB patterns
import boto3, psycopg2

# ── RDS PostgreSQL via psycopg2 ──
# Get connection string from Secrets Manager (never hardcode)
def get_db_connection():
    sm = boto3.client("secretsmanager")
    secret = sm.get_secret_value(SecretId="prod/postgres/main")
    creds = json.loads(secret["SecretString"])
    return psycopg2.connect(
        host=creds["host"], port=5432,
        database=creds["dbname"],
        user=creds["username"], password=creds["password"],
        sslmode="require"         # always require SSL on RDS
    )

# ── DynamoDB single-table design ──
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("app-table")

# Write an item
table.put_item(Item={
    "PK": "USER#usr_123",          # partition key
    "SK": "PROFILE#usr_123",       # sort key
    "name": "Alice",
    "email": "[email protected]",
    "ttl": int(time.time()) + 86400 * 30,  # auto-expire in 30 days
})

# Query all items for a user (one partition key, many sort keys)
response = table.query(
    KeyConditionExpression=Key("PK").eq("USER#usr_123") & Key("SK").begins_with("ORDER#"),
    ScanIndexForward=False,   # newest first
    Limit=20
)

# Conditional write (optimistic locking)
try:
    table.update_item(
        Key={"PK": "PRODUCT#p1", "SK": "PRODUCT#p1"},
        UpdateExpression="SET stock = stock - :qty",
        ConditionExpression="stock >= :qty",
        ExpressionAttributeValues={":qty": 5}
    )
except dynamodb.meta.client.exceptions.ConditionalCheckFailedException:
    print("Out of stock!")

# DynamoDB Streams → trigger Lambda on every write
# (wired in AWS console / CDK / Terraform)
FactorChoose RDS/AuroraChoose DynamoDB
Data modelComplex relations, JOINs, foreign keysKey-value, documents, simple access patterns
Scale<100k writes/sec, TB scaleUnlimited writes, single-digit ms at any scale
Query flexibilityAd-hoc SQL, complex aggregationsKnown access patterns only — no ad-hoc
TransactionsFull ACID, multi-table transactionsLimited (up to 25 items per transaction)
Cost at low loadFixed ~$43+/month minimumOn-demand: pay per request, $0 when idle

VPC, CloudFront, Route 53, ALB

🌐 VPC Architecture Best Practices

CIDR: Use /16 for VPC (65,536 IPs), /24 for subnets (251 usable)
3 AZs: Always deploy across 3 Availability Zones for HA
Public subnets: ALB, NAT Gateway, Bastion hosts only
Private subnets: EC2/ECS/Lambda, RDS, ElastiCache — no direct internet
NAT Gateway: Private subnet → internet egress. $0.045/hr + data transfer
VPC Endpoints: Private connection to S3/DynamoDB — no NAT needed, faster + cheaper
Security Groups: Stateful firewall at instance level (allow inbound = allow return)
NACLs: Stateless firewall at subnet level (must allow both directions)

⚡ CloudFront CDN

450+ edge locations. Serves static assets from cache, proxies dynamic requests to your origin.

# Invalidate CloudFront cache after deploy
aws cloudfront create-invalidation \
  --distribution-id E1234567890ABC \
  --paths "/index.html" "/assets/*"

# Check distribution status
aws cloudfront get-distribution \
  --id E1234567890ABC \
  --query "Distribution.Status"
WAF integration Lambda@Edge S3 OAC HTTPS everywhere Geo-restriction
Python · boto3 Route 53 + ALB health check
import boto3

route53 = boto3.client("route53")

# Update a DNS record (e.g. after new deployment)
route53.change_resource_record_sets(
    HostedZoneId="Z1234567890ABC",
    ChangeBatch={"Changes": [{
        "Action": "UPSERT",
        "ResourceRecordSet": {
            "Name": "api.example.com",
            "Type": "A",
            "AliasTarget": {
                "HostedZoneId": "Z35SXDOTRQ7X7K",   # ALB hosted zone ID
                "DNSName": "my-alb-1234567890.us-east-1.elb.amazonaws.com",
                "EvaluateTargetHealth": True,
            }
        }
    }]}
)

# ALB — register targets and check health
elbv2 = boto3.client("elbv2")

# Get unhealthy targets
response = elbv2.describe_target_health(
    TargetGroupArn="arn:aws:elasticloadbalancing:us-east-1:123456789:targetgroup/my-tg/abc"
)
unhealthy = [t for t in response["TargetHealthDescriptions"] if t["TargetHealth"]["State"] != "healthy"]
print(f"Unhealthy targets: {unhealthy}")

ECS, ECR, EKS

Run Docker containers without managing cluster infrastructure. ECS (simpler) or EKS (Kubernetes-compatible).

📦 ECR — Container Registry

Private Docker registry integrated with ECS/EKS. Supports image scanning, lifecycle policies, cross-region replication.

# Push image to ECR
aws ecr get-login-password --region us-east-1 \
  | docker login --username AWS \
    --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com

docker build -t my-app .
docker tag my-app:latest \
  123456789.dkr.ecr.us-east-1.amazonaws.com/my-app:latest
docker push \
  123456789.dkr.ecr.us-east-1.amazonaws.com/my-app:latest

🚀 ECS Fargate

Serverless containers — no EC2 instances to manage. Define CPU/memory, ECS handles placement, scaling, updates.

No cluster management Pay per task IAM task roles CloudWatch Logs Service Connect

☸️ EKS — Kubernetes

Managed Kubernetes control plane. Choose EKS when you need Helm, RBAC, custom controllers, or multi-cloud portability.

Helm charts IRSA (pod IAM) Karpenter autoscaler Fargate profiles
Python · Deploy new ECS service version (rolling update)
import boto3

ecs = boto3.client("ecs", region_name="us-east-1")

def deploy_new_version(cluster: str, service: str, new_image: str):
    """Rolling deploy: update task definition image → force new deployment."""

    # 1. Get current task definition
    svc = ecs.describe_services(cluster=cluster, services=[service])["services"][0]
    current_td_arn = svc["taskDefinition"]
    td = ecs.describe_task_definition(taskDefinition=current_td_arn)["taskDefinition"]

    # 2. Update the image in the container definition
    containers = td["containerDefinitions"]
    for c in containers:
        if c["name"] == "app":
            c["image"] = new_image    # e.g. "123456.dkr.ecr.../my-app:v2.0.1"

    # 3. Register new task definition revision
    new_td = ecs.register_task_definition(
        family=td["family"],
        containerDefinitions=containers,
        taskRoleArn=td["taskRoleArn"],
        executionRoleArn=td["executionRoleArn"],
        networkMode=td["networkMode"],
        requiresCompatibilities=td["requiresCompatibilities"],
        cpu=td["cpu"], memory=td["memory"],
    )
    new_td_arn = new_td["taskDefinition"]["taskDefinitionArn"]

    # 4. Update service with new task definition → rolling deploy begins
    ecs.update_service(
        cluster=cluster, service=service,
        taskDefinition=new_td_arn,
        forceNewDeployment=True,
        deploymentConfiguration={
            "maximumPercent": 200,        # allow double capacity during rollout
            "minimumHealthyPercent": 100, # never go below 100% healthy
        }
    )
    print(f"Deploying {new_td_arn} to {service}")

deploy_new_version("production", "api-service", "123.dkr.ecr.us-east-1.amazonaws.com/api:v2.1")

IAM, Secrets Manager, KMS, WAF

Security is AWS's top priority — and your responsibility under the shared responsibility model. These are the non-negotiables.

🔑 IAM — Least Privilege is the Law

  • Never use root — create an admin IAM user, lock root with MFA
  • Use roles, not users — EC2/Lambda/ECS use IAM roles, not access keys
  • SCPs (Org level) — deny entire regions or dangerous actions across all accounts
  • Permission boundaries — cap max permissions a role can have
  • Access Analyzer — find over-permissioned policies and external access
  • Credential rotation — set Secrets Manager to auto-rotate every 30 days

🔐 Secrets Manager vs SSM Parameter Store

FeatureSecrets ManagerSSM Param Store
Auto rotation✓ Built-in✗ Manual
DB integration✓ RDS/Redshift✗ No
Cost$0.40/secret/moFree tier
Config values✗ Overkill✓ Great fit
Encryption✓ KMS default✓ SecureString
Python · IAM policy + Secrets Manager rotation
import boto3, json

iam = boto3.client("iam")
sm  = boto3.client("secretsmanager")

# ── Attach a least-privilege policy to a role ──
policy_doc = json.dumps({
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "s3:GetObject",
            "s3:PutObject"
        ],
        "Resource": "arn:aws:s3:::my-bucket/uploads/*"   # narrow scope!
    }, {
        "Effect": "Allow",
        "Action": ["sqs:ReceiveMessage", "sqs:DeleteMessage"],
        "Resource": "arn:aws:sqs:us-east-1:123456789:my-queue"
    }]
})

policy = iam.create_policy(
    PolicyName="AppLeastPrivilege",
    PolicyDocument=policy_doc
)
iam.attach_role_policy(
    RoleName="my-app-role",
    PolicyArn=policy["Policy"]["Arn"]
)

# ── Read a secret (the right way in app code) ──
def get_secret(secret_name: str) -> dict:
    response = sm.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])

db_creds = get_secret("prod/postgres/main")
# {"username": "app_user", "password": "...", "host": "...", "port": 5432}

# ── Enable auto-rotation for a database secret ──
sm.rotate_secret(
    SecretId="prod/postgres/main",
    RotationLambdaARN="arn:aws:lambda:us-east-1:123:function:SecretsManagerRotation",
    RotationRules={"AutomaticallyAfterDays": 30}
)

SQS, SNS, EventBridge

Decouple services so they don't need to talk to each other directly. If one goes down, messages queue up — nothing is lost.

Producer SQS Queue 📨📨📨 messages retention: up to 14 days Consumer 1 Consumer 2 SNS Topic pub/sub fanout all subs get msg SQS Queue A SQS Queue B Lambda / Email Dead Letter Queue (DLQ) — failed messages
Python · SQS producer + consumer with DLQ
import boto3, json

sqs = boto3.client("sqs", region_name="us-east-1")
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"

# ── Send a message ──
sqs.send_message(
    QueueUrl=QUEUE_URL,
    MessageBody=json.dumps({"job_id": "abc123", "type": "process_image"}),
    MessageAttributes={
        "priority": {"StringValue": "high", "DataType": "String"}
    }
)

# ── Send batch (up to 10 messages, 256KB total) ──
sqs.send_message_batch(
    QueueUrl=QUEUE_URL,
    Entries=[{"Id": str(i), "MessageBody": json.dumps({"item": i})} for i in range(10)]
)

# ── Consumer (long polling — cheaper than short polling) ──
def process_queue():
    while True:
        response = sqs.receive_message(
            QueueUrl=QUEUE_URL,
            MaxNumberOfMessages=10,   # batch up to 10
            WaitTimeSeconds=20,       # long poll — up to 20s wait
            VisibilityTimeout=300,    # 5 min to process before requeue
        )
        for msg in response.get("Messages", []):
            try:
                body = json.loads(msg["Body"])
                process_job(body)
                # Delete on success — removes from queue
                sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=msg["ReceiptHandle"])
            except Exception as e:
                print(f"Failed: {e}")
                # Don't delete → message returns to queue after VisibilityTimeout
                # After maxReceiveCount → moves to DLQ automatically

# ── SNS fanout: one publish → many subscribers ──
sns = boto3.client("sns")
sns.publish(
    TopicArn="arn:aws:sns:us-east-1:123456789:order-events",
    Subject="OrderPlaced",
    Message=json.dumps({"order_id": "ord_789", "amount": 59.99}),
)
# All subscribers (SQS queues, Lambda, email) receive this simultaneously

CloudWatch, X-Ray, Cost Explorer

You can't operate what you can't see. Instrument everything from day one.

📊 CloudWatch

Collect metrics, stream logs, set alarms, build dashboards — all in one place.

import boto3
from datetime import datetime, timedelta

cw = boto3.client("cloudwatch")

# Publish custom metric
cw.put_metric_data(
    Namespace="MyApp",
    MetricData=[{
        "MetricName": "OrdersProcessed",
        "Value": 42,
        "Unit": "Count",
        "Dimensions": [{"Name": "Environment", "Value": "production"}]
    }]
)

# Create alarm: alert if error rate > 5%
cw.put_metric_alarm(
    AlarmName="HighErrorRate",
    MetricName="5XXError",
    Namespace="AWS/ApplicationELB",
    Statistic="Sum",
    Period=300,       # 5 minute windows
    EvaluationPeriods=2,
    Threshold=50,
    ComparisonOperator="GreaterThanThreshold",
    AlarmActions=["arn:aws:sns:us-east-1:123:ops-alerts"]
)

# Query logs with Insights
logs = boto3.client("logs")
logs.start_query(
    logGroupName="/ecs/my-app",
    startTime=int((datetime.now() - timedelta(hours=1)).timestamp()),
    endTime=int(datetime.now().timestamp()),
    queryString="fields @timestamp, @message | filter @message like /ERROR/ | limit 100"
)

💸 Cost Management

AWS bills are surprisingly complex. These tools keep costs under control.

  • AWS Budgets — alert at 80%/100% of monthly budget
  • Cost Explorer — visualise costs by service/tag/account
  • Trusted Advisor — finds idle resources, oversized instances
  • Savings Plans — commit to $X/hr, save 66%. Auto-applied.
  • Resource tagging — tag everything: `env`, `team`, `project` for cost allocation
  • Spot for batch — ML training, CI/CD, data processing = 90% savings
⚠️

NAT Gateway is the #1 surprise bill. Each AZ's NAT Gateway costs $0.045/hr + $0.045/GB. VPC Endpoints for S3/DynamoDB eliminate NAT costs for those services.

Which AWS Service Should I Use?

🗄️

Store a file / image / video

Use S3. Add CloudFront CDN in front for global delivery. Use pre-signed URLs for private access.

⚙️

Run a web server / API

Use ECS Fargate (Docker) for always-on, or Lambda + API Gateway for sporadic traffic.

🐘

Need a relational database

Use Aurora Serverless v2 for most apps. It auto-scales and costs $0 when paused.

Need millisecond lookups at scale

Use DynamoDB with on-demand pricing. Design your key schema around access patterns first.

📬

Decouple two services

Use SQS for point-to-point, SNS for fanout, EventBridge for event routing rules.

🔑

Store API keys / DB passwords

Use Secrets Manager with auto-rotation. Never put credentials in code, environment variables, or S3.

🌍

Global low-latency delivery

Use CloudFront (CDN) + Route 53 (latency-based routing) + DynamoDB Global Tables.

🤖

Run ML training / GPU workload

Use EC2 Spot p4d/g5 instances (90% savings) or SageMaker for managed training pipelines.

🔍

Debug production issues

Use CloudWatch Logs Insights for log queries + X-Ray for distributed tracing across services.

🏗️

Well-Architected Framework — 6 pillars: Operational Excellence · Security · Reliability · Performance Efficiency · Cost Optimization · Sustainability. Run the AWS Well-Architected Tool (free) on your architecture before going to production.

Part III

🦜 LangChain & 🕸️ LangGraph

The most widely-used LLM application frameworks — from simple chains to stateful multi-agent systems with persistent memory and human-in-the-loop.

LangChain
Chains · Retrievers · Agents · LCEL
LangGraph
Stateful Graphs · Memory · Multi-Agent
LangSmith
Tracing · Eval · Monitoring

LangChain — The Mental Model

LangChain is a framework for building applications powered by LLMs. It provides primitives for connecting models, data, tools, and memory into pipelines. Everything composes.

MODELS ChatOpenAI ChatAnthropic OllamaLLM PROMPTS ChatPromptTemplate FewShotPromptTemplate MessagesPlaceholder RETRIEVAL VectorStoreRetriever EnsembleRetriever MultiQueryRetriever TOOLS TavilySearch PythonREPLTool @tool decorator MEMORY ConversationBuffer RedisChatHistory SummaryMemory OUTPUT StrOutputParser JsonOutputParser PydanticOutputParser Connected via LCEL pipe operator: prompt | model | output_parser
Python · LangChain installation + provider setup
# Core packages
pip install langchain langchain-openai langchain-anthropic langchain-community
pip install langchain-chroma langchain-qdrant langsmith

# Check versions
python -c "import langchain; print(langchain.__version__)"
Python · Model initialization — OpenAI, Anthropic, Ollama
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_community.llms import Ollama

# OpenAI
gpt4o = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)

# Anthropic Claude
claude = ChatAnthropic(model="claude-3-7-sonnet-20250219", temperature=0)

# Local model via Ollama (no API cost)
llama = Ollama(model="llama3.1:8b")

# All three share the same interface — swap freely
response = gpt4o.invoke("What is RAG?")
print(response.content)

# Streaming
for chunk in gpt4o.stream("Explain LangChain in 3 sentences"):
    print(chunk.content, end="", flush=True)

LCEL — LangChain Expression Language

LCEL uses the | operator to compose runnables into pipelines. Everything is a Runnable — it has invoke, stream, batch, and ainvoke. Chains are lazy — nothing runs until you call invoke.

PromptTemplate format_messages() | ChatOpenAI invoke(messages) | StrOutputParser extract .content chain.invoke( {{"question": "..."}} "string response" plain Python string
Python · LCEL — all the patterns you need
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel
from pydantic import BaseModel

llm    = ChatOpenAI(model="gpt-4o", temperature=0)
parser = StrOutputParser()

# ── Basic chain: prompt | model | parser ──
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant. Be concise."),
    ("human", "{question}")
])
chain = prompt | llm | parser
print(chain.invoke({"question": "What is LCEL?"}))

# ── Streaming ──
for chunk in chain.stream({"question": "Explain RAG in 3 steps"}):
    print(chunk, end="", flush=True)

# ── Batch (parallel) ──
results = chain.batch([
    {"question": "What is S3?"},
    {"question": "What is Lambda?"},
    {"question": "What is DynamoDB?"},
], config={"max_concurrency": 5})

# ── Parallel branches (run two things at once, merge) ──
parallel = RunnableParallel({
    "answer":   chain,
    "keywords": ChatPromptTemplate.from_template("List 5 keywords for: {question}") | llm | parser,
})
out = parallel.invoke({"question": "Explain GraphRAG"})
# out = {"answer": "...", "keywords": "..."}

# ── Structured output with Pydantic ──
class Movie(BaseModel):
    title: str
    year: int
    genre: str

structured_chain = prompt | llm.with_structured_output(Movie)
movie = structured_chain.invoke({"question": "Tell me about Inception"})
print(movie.title, movie.year)   # Inception  2010

# ── Fallbacks ──
fast_chain   = ChatPromptTemplate.from_template("{q}") | ChatOpenAI(model="gpt-4o-mini") | parser
strong_chain = ChatPromptTemplate.from_template("{q}") | ChatOpenAI(model="gpt-4o") | parser
chain_with_fallback = fast_chain.with_fallbacks([strong_chain])

# ── Pass-through + inject extra context ──
rag_chain = (
    RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
    | ChatPromptTemplate.from_template("Context: {context}\n\nQ: {question}")
    | llm | parser
)

Retrievers — Every Pattern You Need

A retriever accepts a string query and returns a list of Documents. LangChain has 50+ built-in retriever types. Here are the ones that matter.

🔍 VectorStore Retriever

The default — cosine similarity search against your vector store. Supports MMR (Maximum Marginal Relevance) for diversity.

from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings

vectorstore = Chroma(
    collection_name="docs",
    embedding_function=OpenAIEmbeddings()
)

# Standard similarity
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 5}
)

# MMR — diverse results (less redundancy)
mmr_retriever = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)

# Similarity + score threshold
thresh_retriever = vectorstore.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={"score_threshold": 0.75, "k": 5}
)

🔀 MultiQueryRetriever

Auto-generates N query variants with an LLM, retrieves for each, deduplicates. Improves recall on vague queries.

from langchain.retrievers.multi_query import MultiQueryRetriever

mq_retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
    include_original=True  # keep original query too
)

# Automatically generates:
# "What is retrieval augmented generation?"
# "How does RAG work?"
# "Explain the RAG architecture"
# → deduplicates results → returns unique docs
docs = mq_retriever.invoke("Tell me about RAG")

🏠 ParentDocument Retriever

Indexes small child chunks for high-precision search, but returns the full parent document for rich context.

from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Child chunks: small = good search precision
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
# Parent chunks: large = rich LLM context
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=InMemoryStore(),    # swap for Redis in production
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
)
retriever.add_documents(docs)
# Retrieves small child → returns big parent

🔗 Ensemble Retriever (Hybrid)

Combines dense vector search with sparse BM25 keyword search using Reciprocal Rank Fusion. Best of both worlds.

from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever

# BM25 — keyword/lexical matching
bm25 = BM25Retriever.from_documents(docs)
bm25.k = 5

# Dense — semantic matching
dense = vectorstore.as_retriever(search_kwargs={"k": 5})

# 60% dense + 40% BM25 — RRF fusion
hybrid = EnsembleRetriever(
    retrievers=[dense, bm25],
    weights=[0.6, 0.4]
)
# Excels when users search by exact terms AND by meaning
result = hybrid.invoke("faiss cosine similarity ANN index")
Python · Full RAG chain with retriever + history-aware rewrite
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import MessagesPlaceholder
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Step 1: Rewrite query using chat history ("it" → resolve pronoun)
contextualize_prompt = ChatPromptTemplate.from_messages([
    ("system", "Rewrite the user question to be standalone, using the chat history if needed."),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}"),
])
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_prompt)

# Step 2: Answer with retrieved docs
qa_prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer using only this context:\n\n{context}"),
    MessagesPlaceholder("chat_history"),
    ("human", "{input}"),
])
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)

# Step 3: Compose into conversational RAG chain
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

# Step 4: Add session-scoped memory
store = {}  # session_id → ChatMessageHistory
def get_session_history(session_id: str):
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

conversational_rag = RunnableWithMessageHistory(
    rag_chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="chat_history",
    output_messages_key="answer",
)

# Multi-turn conversation — "it" resolved from history
r1 = conversational_rag.invoke({"input": "What is RAG?"}, config={"configurable": {"session_id": "u1"}})
r2 = conversational_rag.invoke({"input": "How is it different from fine-tuning?"}, config={"configurable": {"session_id": "u1"}})
print(r2["answer"])

Essential Chain Patterns

📝 Summarization

from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Map-Reduce: summarise each chunk → combine summaries
chain = load_summarize_chain(
    llm,
    chain_type="map_reduce",   # or "stuff" (all at once), "refine"
    verbose=True
)

splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=400)
docs = splitter.create_documents([very_long_text])
summary = chain.invoke(docs)
print(summary["output_text"])

🗃️ SQL Chain

from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain

db = SQLDatabase.from_uri("postgresql://user:pass@host/mydb")

# Natural language → SQL → execute → natural language answer
sql_chain = create_sql_query_chain(llm, db)
query = sql_chain.invoke({"question": "How many orders were placed last month?"})
# Generates: SELECT COUNT(*) FROM orders WHERE created_at > NOW() - INTERVAL '1 month'
result = db.run(query)

🔍 Self-Querying Retriever

from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo

# LLM writes the metadata filter automatically from natural language
metadata_field_info = [
    AttributeInfo(name="source", description="The PDF file name", type="string"),
    AttributeInfo(name="year",   description="The publication year", type="integer"),
    AttributeInfo(name="topic",  description="Main topic", type="string"),
]
retriever = SelfQueryRetriever.from_llm(
    llm, vectorstore, "Research papers on AI", metadata_field_info
)
# "Papers about RAG published after 2023" →
# filter: {"year": {"$gt": 2023}, "topic": "RAG"}
docs = retriever.invoke("Papers about RAG published after 2023")

🧮 Router Chain

from langchain_core.runnables import RunnableLambda

# Route to different chains based on query type
rag_chain    = build_rag_chain()
sql_chain    = build_sql_chain()
general_chain = prompt | llm | parser

def route(info: dict):
    question = info["question"].lower()
    if any(w in question for w in ["database", "table", "sql", "query"]):
        return sql_chain
    elif any(w in question for w in ["document", "pdf", "report"]):
        return rag_chain
    return general_chain

router = RunnableLambda(route)
full_chain = {"question": RunnablePassthrough()} | router
answer = full_chain.invoke({"question": "What's in the Q3 report?"})

LangChain Agents

Agents let the LLM decide which tools to call and in what order. The model acts in a loop: Thought → Action → Observation → Thought → … → Final Answer.

Python · Building tools + ReAct agent
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
from langchain.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults

# ── Define custom tools with @tool decorator ──
@tool
def calculate_compound_interest(principal: float, rate: float, years: int) -> str:
    """Calculate compound interest. Args: principal (USD), rate (annual %), years."""
    amount = principal * (1 + rate / 100) ** years
    return f"${amount:,.2f} after {years} years"

@tool
def get_stock_price(ticker: str) -> str:
    """Get the current stock price for a given ticker symbol."""
    # In production: call a real finance API
    prices = {"AAPL": 189.30, "GOOGL": 173.50, "AMZN": 185.20}
    return f"{ticker}: ${prices.get(ticker.upper(), 'Not found')}"

@tool
def search_knowledge_base(query: str) -> str:
    """Search internal knowledge base for company-specific information."""
    docs = retriever.invoke(query)
    return "\n".join(d.page_content for d in docs[:3])

# ── Create agent ──
tools = [
    TavilySearchResults(max_results=3),
    calculate_compound_interest,
    get_stock_price,
    search_knowledge_base,
]

prompt = hub.pull("hwchase17/react")   # standard ReAct prompt
agent  = create_react_agent(llm=ChatOpenAI(model="gpt-4o", temperature=0), tools=tools, prompt=prompt)

executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,          # shows Thought / Action / Observation loop
    max_iterations=10,     # safety cap — prevents infinite loops
    handle_parsing_errors=True,
    early_stopping_method="generate",
)

# Agent decides: search web → use calculator → search KB → synthesise
result = executor.invoke({
    "input": "If I invest $10,000 in AAPL today at their historical 15% annual growth, what do I have in 10 years?"
})
print(result["output"])
🔧

LangSmith tracing: Set LANGCHAIN_TRACING_V2=true and LANGCHAIN_API_KEY=ls__... in your env. Every chain call, token count, latency, and agent step is logged to the LangSmith dashboard — indispensable for debugging agents.

LangGraph — Why Graphs Beat Chains

LangChain chains are linear (A→B→C). LangGraph adds cycles, state, branching, and persistence — the four things real AI systems need.

❌ When LCEL Chains Break Down

  • → Agent needs to loop back (retry, refine) — chains are one-way
  • → Need human approval mid-execution — chains can't pause
  • Multiple parallel agents need to coordinate — chains are single-thread
  • → State must persist across sessions — chains are stateless by default
  • → Need to resume after failure — chains restart from scratch

✅ LangGraph Solutions

  • Cycles — edges can loop back to any previous node
  • Interrupt/Resume — pause at any node for human approval
  • Parallel nodes — run multiple agents simultaneously, fan-out/fan-in
  • Checkpointers — SQLite/Redis/Postgres persistence between runs
  • Time travel — replay execution from any past checkpoint
START 🤖 Agent LLM decides action 🔧 Tools execute action 🧑 Human approve / reject END STATE tool call needs approval observation → loop back done Checkpointer saves state after every node step SQLite / Redis / Postgres

Graph Basics — State, Nodes, Edges

Every LangGraph app has three parts: a State (TypedDict that flows through the graph), Nodes (Python functions that update state), and Edges (connections, including conditional branches).

Python · LangGraph hello-world — complete working example
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# ── 1. Define State ──
# Annotated[list, operator.add] means: each node APPENDS to messages (not replaces)
class State(TypedDict):
    messages: Annotated[list, operator.add]
    query: str
    documents: list
    answer: str

# ── 2. Define Nodes (each is a plain Python function) ──
def retrieve(state: State) -> State:
    """Retrieve relevant documents."""
    docs = retriever.invoke(state["query"])
    return {"documents": docs}

def generate(state: State) -> State:
    """Generate answer from retrieved docs."""
    context = "\n".join(d.page_content for d in state["documents"])
    response = llm.invoke([
        HumanMessage(content=f"Context:\n{context}\n\nQuestion: {state['query']}")
    ])
    return {
        "answer": response.content,
        "messages": [AIMessage(content=response.content)]
    }

def grade_answer(state: State) -> str:
    """Conditional edge: route based on answer quality."""
    # Simple heuristic — in production use an LLM grader
    if len(state["answer"]) < 50 or "I don't know" in state["answer"]:
        return "retry"   # loop back to retrieve with better query
    return "done"

# ── 3. Build Graph ──
builder = StateGraph(State)

builder.add_node("retrieve", retrieve)
builder.add_node("generate", generate)

# Edges
builder.add_edge(START, "retrieve")
builder.add_edge("retrieve", "generate")

# Conditional edge: done → END, retry → retrieve (loop!)
builder.add_conditional_edges(
    "generate",
    grade_answer,
    {"done": END, "retry": "retrieve"}
)

# ── 4. Compile with checkpointer (enables persistence + interrupt) ──
memory = MemorySaver()   # in-memory; swap for SqliteSaver / RedisSaver in prod
graph  = builder.compile(checkpointer=memory)

# ── 5. Run with thread_id (each thread = isolated conversation) ──
config = {"configurable": {"thread_id": "user-123-session-1"}}
result = graph.invoke({"query": "What is CRAG?", "messages": [], "documents": [], "answer": ""}, config)
print(result["answer"])

# Resume same thread later — state is persisted!
result2 = graph.invoke({"query": "How does it compare to Self-RAG?", "messages": result["messages"], "documents": [], "answer": ""}, config)
Python · Visualise the graph
# Print ASCII representation
graph.get_graph().print_ascii()

# Export as PNG (requires pygraphviz)
from IPython.display import Image
Image(graph.get_graph().draw_mermaid_png())

# Or get Mermaid markdown
print(graph.get_graph().draw_mermaid())

The 5 Essential LangGraph Patterns

🔁

Pattern 1 — ReAct Agent Loop Beginner

The classic agent loop: reason → act → observe → repeat until done. Built into LangGraph as a prebuilt.

from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool

@tool
def search(query: str) -> str:
    """Search the web for current information."""
    return tavily.invoke(query)

@tool
def calculator(expression: str) -> str:
    """Evaluate a math expression."""
    return str(eval(expression))   # use numexpr in production

# create_react_agent is the fastest path — wraps the full graph
agent = create_react_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=[search, calculator],
    checkpointer=MemorySaver(),   # memory across turns
    prompt="You are a research assistant. Use tools to answer accurately."
)

# Stream intermediate steps (agent reasoning is visible)
for event in agent.stream(
    {"messages": [HumanMessage(content="What's 15% of NVIDIA's current market cap?")]},
    config={"configurable": {"thread_id": "t1"}},
    stream_mode="values"
):
    event["messages"][-1].pretty_print()
🧑

Pattern 2 — Human-in-the-Loop Intermediate

Pause execution before a dangerous action (send email, delete record, execute code), wait for human approval, then resume.

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class ApprovalState(TypedDict):
    messages: Annotated[list, operator.add]
    action: str
    approved: bool

def plan_action(state):
    response = llm.invoke(state["messages"])
    return {"action": response.content, "messages": [response]}

def execute_action(state):
    # Only runs after human approval
    print(f"Executing: {state['action']}")
    result = dangerous_api_call(state["action"])
    return {"messages": [AIMessage(content=f"Done: {result}")]}

builder = StateGraph(ApprovalState)
builder.add_node("plan", plan_action)
builder.add_node("execute", execute_action)
builder.add_edge(START, "plan")
builder.add_edge("plan", "execute")   # interrupted before this
builder.add_edge("execute", END)

# interrupt_before=["execute"] — graph pauses BEFORE running "execute"
graph = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["execute"]   # ← the magic
)

config = {"configurable": {"thread_id": "approval-flow-1"}}

# Run until the interrupt
graph.invoke({"messages": [HumanMessage(content="Delete all staging data")]}, config)

# ↑ Graph pauses here. Show plan to human...
state = graph.get_state(config)
print("Pending action:", state.values["action"])

# Human approves — resume by passing None (continue from checkpoint)
# To reject, update state instead
graph.invoke(None, config)   # resumes from interrupt checkpoint

Pattern 3 — Parallel Fan-Out / Fan-In Intermediate

Run multiple agents or research branches simultaneously, then merge results. LangGraph handles the synchronisation.

from langgraph.graph import StateGraph, START, END

class ResearchState(TypedDict):
    topic: str
    web_results: str
    db_results: str
    summary: str

def web_researcher(state):
    """Searches the web — runs in parallel with db_researcher."""
    results = tavily.invoke(state["topic"])
    return {"web_results": str(results)}

def db_researcher(state):
    """Queries internal vector DB — runs in parallel."""
    docs = retriever.invoke(state["topic"])
    return {"db_results": "\n".join(d.page_content for d in docs)}

def synthesiser(state):
    """Waits for both branches, then merges."""
    prompt = f"Web:\n{state['web_results']}\n\nInternal docs:\n{state['db_results']}\n\nSynthesise:"
    answer = llm.invoke([HumanMessage(content=prompt)])
    return {"summary": answer.content}

builder = StateGraph(ResearchState)
builder.add_node("web_researcher", web_researcher)
builder.add_node("db_researcher", db_researcher)
builder.add_node("synthesiser", synthesiser)

# Fan-out: START → both branches simultaneously
builder.add_edge(START, "web_researcher")
builder.add_edge(START, "db_researcher")

# Fan-in: both must complete before synthesiser runs
builder.add_edge("web_researcher", "synthesiser")
builder.add_edge("db_researcher", "synthesiser")
builder.add_edge("synthesiser", END)

graph = builder.compile()
result = graph.invoke({"topic": "LangGraph vs Crew AI", "web_results": "", "db_results": "", "summary": ""})
print(result["summary"])
🗂️

Pattern 4 — Sub-graphs Advanced

Compose complex graphs from smaller reusable graphs. A sub-graph is compiled independently and added as a node in a parent graph.

# Sub-graph: a reusable RAG pipeline
rag_builder = StateGraph(State)
rag_builder.add_node("retrieve", retrieve)
rag_builder.add_node("generate", generate)
rag_builder.add_edge(START, "retrieve")
rag_builder.add_edge("retrieve", "generate")
rag_builder.add_edge("generate", END)
rag_graph = rag_builder.compile()

# Parent graph uses the sub-graph as a node
parent_builder = StateGraph(ParentState)
parent_builder.add_node("classify",  classify_query)
parent_builder.add_node("rag",       rag_graph)        # ← sub-graph as node
parent_builder.add_node("sql_agent", sql_agent_graph)  # ← another sub-graph

parent_builder.add_conditional_edges(
    "classify",
    lambda s: s["query_type"],
    {"document": "rag", "database": "sql_agent"}
)
parent_graph = parent_builder.compile()

Memory — Short-Term, Long-Term, Semantic

LangGraph has a first-class memory system. Thread-scoped memory (checkpointer) for conversational context, and cross-thread memory (store) for user profiles and facts.

Python · Persistent memory with SQLite + cross-thread semantic store
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.store.memory import InMemoryStore
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import SystemMessage
import json

# ── Thread-scoped memory (per conversation) ──
# SqliteSaver: persists to disk — survives restarts
db_path = "checkpoints.db"
with SqliteSaver.from_conn_string(db_path) as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "user-42-chat-7"}}
    graph.invoke({"messages": [HumanMessage("My name is Alice")]}, config)
    # Restart app — state still there!
    graph.invoke({"messages": [HumanMessage("What's my name?")]}, config)
    # → "Your name is Alice" ✓

# ── Cross-thread memory (user profile, facts) ──
store = InMemoryStore()   # swap for PostgresStore in production

def chatbot_with_memory(state, config, *, store):
    user_id = config["configurable"]["user_id"]
    namespace = ("users", user_id, "memories")

    # Recall existing memories for this user
    memories = store.search(namespace, query=state["messages"][-1].content, limit=3)
    memory_text = "\n".join(m.value["fact"] for m in memories) if memories else ""

    system = f"""You are a personal assistant.
User facts you know:
{memory_text}"""

    response = llm.invoke([SystemMessage(content=system)] + state["messages"])

    # Extract and save new facts mentioned in this message
    new_facts = extract_facts(state["messages"][-1].content)
    for fact in new_facts:
        store.put(namespace, key=fact[:50], value={"fact": fact})

    return {"messages": [response]}

# "My dog is called Max" → stored as fact
# Next session: "What's my dog's name?" → "Max" (retrieved from store)
Memory TypeScopeImplementationUse Case
In-contextSingle responsemessages list in stateConversational context window
Thread (short-term)One conversationCheckpointer (SQLite/Redis)Multi-turn chat, resume sessions
Cross-thread (long-term)Across all chatsStore (Postgres/Pinecone)User preferences, facts, profiles
SemanticCross-threadStore + embedding search"What did the user say about X?"

Multi-Agent Systems

Orchestrate multiple specialised agents — a Supervisor routes tasks to Worker agents, each with their own tools and expertise.

👤 User 🎯 Supervisor routes to best agent aggregates results 🔍 Researcher web + docs + RAG 💻 Coder write + execute code 📊 Analyst data + SQL + charts ✍️ Writer final synthesis
Python · Supervisor multi-agent system
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from typing import Literal

# ── Create specialised worker agents ──
researcher = create_react_agent(
    ChatOpenAI(model="gpt-4o"), tools=[tavily_search, vectorstore_search],
    prompt="You are a research specialist. Find accurate information."
)

coder = create_react_agent(
    ChatOpenAI(model="gpt-4o"), tools=[python_repl, code_interpreter],
    prompt="You are a Python expert. Write clean, working code."
)

analyst = create_react_agent(
    ChatOpenAI(model="gpt-4o"), tools=[sql_tool, chart_tool],
    prompt="You are a data analyst. Query data and visualise insights."
)

# ── Supervisor state ──
class SupervisorState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str

# ── Supervisor node: LLM decides which agent to call next ──
SUPERVISOR_PROMPT = """You are a supervisor managing: researcher, coder, analyst, FINISH.
Given the conversation, decide which agent should act next, or FINISH if done.
Respond with just the agent name."""

def supervisor(state: SupervisorState) -> SupervisorState:
    response = llm.invoke([
        SystemMessage(content=SUPERVISOR_PROMPT),
        *state["messages"]
    ])
    return {"next_agent": response.content.strip()}

def route(state) -> Literal["researcher", "coder", "analyst", END]:
    next_a = state["next_agent"]
    if next_a == "FINISH": return END
    return next_a

# ── Build supervisor graph ──
builder = StateGraph(SupervisorState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", lambda s: {"messages": researcher.invoke(s)["messages"]})
builder.add_node("coder",      lambda s: {"messages": coder.invoke(s)["messages"]})
builder.add_node("analyst",    lambda s: {"messages": analyst.invoke(s)["messages"]})

builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", route)

# All workers report back to supervisor after each turn
for worker in ["researcher", "coder", "analyst"]:
    builder.add_edge(worker, "supervisor")

graph = builder.compile(checkpointer=MemorySaver())

# Complex task: supervisor orchestrates multiple agents automatically
result = graph.invoke({
    "messages": [HumanMessage(content=
        "Research LangGraph's architecture, write a Python example of a multi-agent system, "
        "and analyse what percentage of GitHub repos use LangChain vs LangGraph"
    )],
    "next_agent": ""
}, config={"configurable": {"thread_id": "complex-task-1"}})

🆚 LangGraph vs CrewAI

LangGraph = low-level graph control. CrewAI = high-level role-based abstraction. LangGraph wins on flexibility; CrewAI wins on onboarding speed.

🔍 LangSmith Debugging

Every node execution, token count, and latency is traced. Filter by thread_id, tag chains by use-case, compare runs side-by-side.

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="ls__..."
export LANGCHAIN_PROJECT="my-agent"
# All runs now appear in app.langsmith.com

🚀 LangGraph Platform

Deploy LangGraph graphs as production APIs. Built-in: horizontal scaling, streaming, cron jobs, webhooks, Studio UI for visual debugging.

# Deploy to LangGraph Cloud
pip install langgraph-cli
langgraph up     # local Studio
langgraph deploy # cloud deployment

📋 LangChain vs LangGraph — When to Use Which

SituationUseWhy
Simple Q&A, summarisation, translationLCEL ChainLinear, no state needed
RAG with a single retrieval stepLCEL + Retrievercreate_retrieval_chain covers it
Agent that uses tools in a loopLangGraph ReActCycles + state + interrupt support
Multi-turn chat with memoryLangGraph + CheckpointerThread-scoped persistence built-in
Human approval before actionLangGraph interrupt_beforeLCEL can't pause mid-execution
Multiple agents collaboratingLangGraph SupervisorFan-out/fan-in, sub-graphs, coordination
Long-running background taskLangGraph PlatformDurable execution, webhooks, streaming
Part IV · The Complete Picture

High-End AI Professional Roadmap

Everything beyond RAG, AWS, and LangChain that separates an AI practitioner from a true AI architect — from mathematical foundations to emerging 2026 techniques.

The Complete AI Professional Stack

Think in layers. Each layer depends on the ones below it. Most practitioners skip foundations and wonder why they can't debug models or design architectures from first principles.

LAYER 1 — FOUNDATIONS LAYER 2 — CORE ML/DL LAYER 3 — LLM SYSTEMS LAYER 4 — PRODUCTION LAYER 5 — FRONTIER 📐 Linear Algebra vectors · matrices · SVD 🎲 Probability & Stats Bayes · MLE · distributions Calculus & Optim. gradients · chain rule · Adam 🐍 Python + NumPy broadcasting · vectorisation ℹ️ Info Theory entropy · KL divergence · CE 🗄️ Data Engineering Airflow · Spark · dbt 🧠 Neural Networks backprop · activations 🔄 Transformers attention · BERT · GPT 👁️ Computer Vision CNN · YOLO · ViT · SAM 🎨 Generative Models Diffusion · VAE · GAN 🏋️ Training at Scale PyTorch · distributed · FSDP 📊 Evaluation Science benchmarks · HELM · evals 🔧 Fine-Tuning LoRA · QLoRA · SFT · DPO 💬 Prompt Engineering CoT · ToT · few-shot · DSPy 🤖 Agent Frameworks LangGraph · AutoGen · CrewAI 🔍 RAG Systems GraphRAG · Agentic · CRAG 🖼️ Multimodal vision · audio · video LLMs 🛡️ AI Safety RLHF · alignment · red-team Inference Optim. vLLM · quant · speculative 🔬 MLOps MLflow · W&B · DVC · Feast ☁️ Cloud AI Infra AWS/GCP · SageMaker · k8s 📈 Model Monitoring drift · shadow deploy · A/B 🔐 AI Security prompt injection · PII · guardrails 💼 AI Product Design UX for AI · ROI · ethics Test-Time Compute MoE Architectures State Space Models AI Agents at Scale World Models AI + Robotics

✅ Already In This Guide

RAG (all variants) Vector DBs LangChain LCEL LangGraph AWS (12 services) Embedding Models Evaluation (RAGAS) Agentic RAG GraphRAG Multi-Agent Systems Multimodal RAG Production RAG

🗺️ What This Roadmap Adds

Math Foundations Transformer Internals Fine-Tuning (LoRA/DPO) Prompt Engineering Inference Optimization MLOps (W&B/MLflow) AI Safety & Alignment Diffusion Models Computer Vision Test-Time Compute MoE / Mamba AI Security

Mathematics for AI — The Non-Negotiables

You don't need a PhD, but you need enough math to read papers, understand what's actually happening inside models, and debug when things go wrong.

📐 Linear Algebra (Most Important)

import numpy as np

# Vectors = embeddings. Dot product = similarity.
v1 = np.array([0.2, 0.8, 0.5])   # "king" embedding
v2 = np.array([0.1, 0.9, 0.4])   # "queen" embedding
cosine_sim = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
print(f"Cosine similarity: {cosine_sim:.4f}")   # 0.9996

# Matrix multiplication = linear layer (weight matrix W applied to input x)
W = np.random.randn(768, 3072)   # FFN expand layer (GPT-2 style)
x = np.random.randn(768)          # token embedding
out = W.T @ x                     # → 3072-dim hidden state

# SVD = how PCA, LoRA, and embedding compression work
U, S, Vt = np.linalg.svd(W, full_matrices=False)
# Keep top-r singular values = low-rank approximation (the core of LoRA)
r = 8
W_approx = U[:, :r] @ np.diag(S[:r]) @ Vt[:r, :]
print(f"Rank-{r} compression: {W.shape} → saved {1 - r*2/768:.1%} params")

# Eigenvalues = used in attention score analysis, PCA
eigenvalues, eigenvectors = np.linalg.eig(W @ W.T)
Vectors & dot product Matrix multiply SVD / eigendecomposition Norms & projections Rank & low-rank approx

🎲 Probability & Statistics

import torch
import torch.nn.functional as F

# Softmax = turning logits into probabilities (used everywhere)
logits = torch.tensor([2.0, 1.0, 0.1])
probs  = F.softmax(logits, dim=-1)
# tensor([0.6590, 0.2424, 0.0986]) — sums to 1

# Cross-entropy loss = how LLMs are trained (predict next token)
# True label = index 0 ("the" was the next word)
target = torch.tensor([0])
loss = F.cross_entropy(logits.unsqueeze(0), target)
print(f"Loss: {loss.item():.4f}")   # -log(0.659) = 0.417

# KL Divergence = how DPO/RLHF penalise diverging from reference model
p = torch.softmax(torch.tensor([3.0, 1.0, 0.5]), dim=0)  # policy
q = torch.softmax(torch.tensor([2.5, 1.2, 0.3]), dim=0)  # reference
kl = (p * (p / q).log()).sum()
print(f"KL(p||q) = {kl.item():.4f}")

# Temperature sampling (controls randomness of generation)
temp = 0.7
scaled_logits = logits / temp
probs_temp = F.softmax(scaled_logits, dim=-1)
# Lower temp → more deterministic. temp=0 → greedy.

∂ Calculus — Gradients & Backprop

import torch

# Automatic differentiation — how PyTorch computes gradients
x = torch.tensor(2.0, requires_grad=True)
y = x ** 3 + 2 * x          # y = x³ + 2x
y.backward()                  # compute dy/dx via chain rule
print(x.grad)                 # tensor(14.) = 3x² + 2 at x=2

# A simple neural network: forward → loss → backward → step
model = torch.nn.Linear(10, 1)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=0.01)

for step in range(100):
    x = torch.randn(32, 10)      # batch of 32
    y_true = torch.randn(32, 1)
    y_pred = model(x)
    loss = F.mse_loss(y_pred, y_true)
    optimizer.zero_grad()        # clear old gradients
    loss.backward()              # compute new gradients
    optimizer.step()             # update weights: w = w - lr * grad

# Key optimizers to know:
# SGD → simple, good for vision models
# Adam → adaptive lr, great for NLP
# AdamW → Adam + weight decay → prevents overfitting (used for LLMs)
# Adafactor → memory-efficient, used for very large models

📚 Resources to Learn These

  • 3Blue1Brown — "Essence of Linear Algebra" + "Neural Networks" YouTube series. Best visual intuition available.
  • fast.ai — Practical Deep Learning for Coders. Top-down, code-first, free.
  • Andrej Karpathy — makemore + nanoGPT — Build a GPT from scratch in pure PyTorch. Best LLM intuition builder.
  • CS229 (Stanford) — ML theory, publicly available. Covers probability + optimisation rigorously.
  • Dive into Deep Learning (d2l.ai) — Free textbook with code. Covers everything from perceptrons to transformers.

Transformer Internals & Deep Learning Architecture

To architect and debug LLM systems at a professional level, you must understand what actually happens inside a transformer — not just the API.

"The cat sat" Tokeniser BPE / WordPiece [464, 3797, 3332] Embedding token + positional → [seq, d_model] × N Transformer Blocks Multi-Head Attention Q·K⊤/√d_k → softmax → ·V LayerNorm + Residual Feed-Forward Network Linear → GELU → Linear (4× expand) LM Head Linear → softmax → vocab probs "on" (next) KV Cache: stores K,V per block so past tokens aren't recomputed
Python · Scaled dot-product attention from scratch
import torch
import torch.nn.functional as F
import math

def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    The heart of every transformer.
    Q, K, V: [batch, heads, seq_len, head_dim]
    """
    d_k = Q.size(-1)

    # Attention scores: how much each token should attend to each other token
    scores = Q @ K.transpose(-2, -1) / math.sqrt(d_k)  # [B, H, seq, seq]

    # Causal mask: decoder can't see future tokens (autoregressive)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))

    # Convert scores to probabilities
    attn_weights = F.softmax(scores, dim=-1)

    # Weighted sum of values
    return attn_weights @ V, attn_weights   # [B, H, seq, head_dim]

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_model=768, n_heads=12):
        super().__init__()
        self.n_heads  = n_heads
        self.d_head   = d_model // n_heads
        self.W_q = torch.nn.Linear(d_model, d_model, bias=False)
        self.W_k = torch.nn.Linear(d_model, d_model, bias=False)
        self.W_v = torch.nn.Linear(d_model, d_model, bias=False)
        self.W_o = torch.nn.Linear(d_model, d_model, bias=False)

    def forward(self, x, mask=None):
        B, T, C = x.shape   # batch, seq_len, d_model

        # Project → split into heads
        def split_heads(w): return w.view(B, T, self.n_heads, self.d_head).transpose(1, 2)
        Q, K, V = split_heads(self.W_q(x)), split_heads(self.W_k(x)), split_heads(self.W_v(x))

        # Attention
        out, weights = scaled_dot_product_attention(Q, K, V, mask)

        # Merge heads → project out
        out = out.transpose(1, 2).contiguous().view(B, T, C)
        return self.W_o(out), weights

# Flash Attention (PyTorch 2.0+) — same math, 3× faster, 10× less memory
# Use this in production, not the above:
with torch.backends.cuda.sdp_kernel(enable_flash=True):
    out = F.scaled_dot_product_attention(Q, K, V, is_causal=True)
ArchitectureKey IdeaBest ForExamples
GPT (Decoder-only)Causal attention, predicts next tokenGeneration, chat, codeGPT-4, Claude, Llama 3
BERT (Encoder-only)Bidirectional attention, masked LMClassification, embeddingsBERT, RoBERTa, E5
T5 (Encoder-Decoder)Seq2seq with cross-attentionTranslation, summarisationT5, FLAN-T5, mT5
MoE (Mixture of Experts)Route each token to top-k expert FFNsScale efficiencyMixtral, GPT-4 (rumoured), Gemini
Mamba (SSM)State space, linear time complexityVery long contextsMamba, Jamba
DiffusionLearn to denoise from Gaussian noiseImage/video/audio genSD 3, DALL-E 3, Sora

Fine-Tuning LLMs — SFT, LoRA, DPO, RLHF

Fine-tuning adapts a pre-trained LLM to your domain or behaviour. In 2025–2026 the best practitioners combine LoRA-efficient training with DPO preference alignment.

🔧 LoRA — Low-Rank Adaptation

Instead of updating all 7B parameters, train two small matrices A and B whose product approximates the weight update. Reduces trainable params by 99%+.

from peft import LoraConfig, get_peft_model, TaskType
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
from trl import SFTTrainer

# Load base model in 4-bit (QLoRA = LoRA + 4-bit quantisation)
from transformers import BitsAndBytesConfig
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Meta-Llama-3-8B",
    quantization_config=bnb_config,
    device_map="auto"
)

# Apply LoRA adapters (only train these ~0.5% of params)
lora_config = LoraConfig(
    r=16,               # rank — higher = more capacity, more params
    lora_alpha=32,      # scaling factor
    target_modules=["q_proj", "v_proj", "k_proj", "o_proj"],  # which layers
    lora_dropout=0.05,
    task_type=TaskType.CAUSAL_LM,
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
# trainable params: 3,407,872 || all params: 8,033,669,120 || trainable: 0.042%

# Train with SFTTrainer (Supervised Fine-Tuning)
trainer = SFTTrainer(
    model=model,
    train_dataset=my_dataset,          # {"text": "prompt + completion"}
    dataset_text_field="text",
    max_seq_length=2048,
    args=TrainingArguments(
        output_dir="./lora-finetuned",
        per_device_train_batch_size=4,
        gradient_accumulation_steps=4, # effective batch = 16
        warmup_steps=100,
        num_train_epochs=3,
        learning_rate=2e-4,
        fp16=True,
        logging_steps=10,
    ),
)
trainer.train()

🎯 DPO — Direct Preference Optimisation

Aligns model behaviour to human preferences WITHOUT a separate reward model. Train on (prompt, chosen, rejected) pairs. Simpler than RLHF, just as effective.

from trl import DPOTrainer, DPOConfig

# DPO dataset: preferred vs rejected responses
# {
#   "prompt": "Explain quantum computing",
#   "chosen": "Quantum computing uses qubits...",   ← preferred
#   "rejected": "Quantum computing is too complex...", ← rejected
# }

dpo_trainer = DPOTrainer(
    model=sft_model,               # start from your SFT model
    ref_model=base_model,          # reference (frozen) model
    args=DPOConfig(
        beta=0.1,                  # KL penalty — how far from ref model
        max_prompt_length=512,
        max_length=1024,
        output_dir="./dpo-aligned",
        per_device_train_batch_size=4,
        learning_rate=5e-5,
        num_train_epochs=1,
    ),
    train_dataset=preference_dataset,
    tokenizer=tokenizer,
)
dpo_trainer.train()

# RLHF vs DPO:
# RLHF: train reward model → PPO (complex, unstable, expensive)
# DPO: direct training from preference data (simple, stable, same quality)
# → DPO is now the default choice for alignment
💡

Dataset sizes: SFT needs ~1K–10K high-quality examples. DPO needs ~500–5K preference pairs. Quality >> quantity — curate carefully.

💬

Advanced Prompt Engineering Must Know

Goes far beyond "write a better prompt". These techniques directly affect quality as much as fine-tuning.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# ── Chain-of-Thought (CoT) — "think step by step" unlocks reasoning ──
cot_prompt = ChatPromptTemplate.from_messages([("human",
    "{question}\n\nThink step by step before giving your final answer."
)])
# Improves accuracy on math/logic by 20-40%

# ── Tree of Thought (ToT) — explore multiple reasoning paths ──
tot_prompt = """Explore 3 different approaches to this problem:
{problem}

For each approach:
1. Describe the approach
2. Evaluate if it leads to the correct answer
3. Score confidence 1-10

Then select the best approach and give the final answer."""

# ── Self-Consistency — sample N times, majority vote ──
def self_consistent_answer(question: str, n: int = 5) -> str:
    from collections import Counter
    answers = [llm.invoke(question).content for _ in range(n)]
    # Parse final answers and take majority vote
    final_answers = [extract_answer(a) for a in answers]
    return Counter(final_answers).most_common(1)[0][0]

# ── ReAct — Reason + Act (the prompt behind agents) ──
react_prompt = """Answer the question using this format:
Thought: I need to think about what to do
Action: tool_name[input]
Observation: (result from tool)
... repeat as needed ...
Final Answer: your answer

Question: {question}"""

# ── System prompt engineering for Claude ──
SYSTEM = """You are a senior financial analyst.

- Only use data provided in the context
- Express uncertainty explicitly
- Always cite the source paragraph


## Analysis
## Key Risks
## Recommendation (Buy/Hold/Sell)
"""

Inference Optimization — Speed & Cost at Scale

A model that's too slow or too expensive doesn't ship. These techniques can reduce inference cost by 5–20× without touching accuracy.

⚡ vLLM — PagedAttention

Manages KV cache like virtual memory. 24× higher throughput than HuggingFace transformers. The production standard for self-hosted LLMs.

# Serve Llama 3 with vLLM
pip install vllm
python -m vllm.entrypoints.openai.api_server \
  --model meta-llama/Meta-Llama-3-8B-Instruct \
  --tensor-parallel-size 2 \
  --gpu-memory-utilization 0.9 \
  --max-model-len 8192

# OpenAI-compatible API at :8000
curl http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"model":"meta-llama/Meta-Llama-3-8B-Instruct",
       "messages":[{"role":"user","content":"Hello"}]}'

🗜️ Quantization

Reduce weight precision from FP32 → FP16 → INT8 → INT4. 4-bit = 4× less VRAM. Use GPTQ or AWQ for post-training quantisation.

from transformers import AutoModelForCausalLM
from awq import AutoAWQForCausalLM

# AWQ 4-bit quantisation (best quality/speed tradeoff)
model = AutoAWQForCausalLM.from_pretrained("meta-llama/Llama-3-8B")
quant_config = {"zero_point": True, "q_group_size": 128, "w_bit": 4, "version": "GEMM"}
model.quantize(tokenizer, quant_config=quant_config)
model.save_quantized("llama3-8b-awq-4bit")
# 8B model: 16GB FP16 → 4GB INT4 — runs on a single 4090

🔮 Speculative Decoding

A small draft model generates N tokens speculatively. The large model verifies all in one pass. 2–3× speedup for same quality output.

from transformers import pipeline

# Built into HuggingFace transformers
pipe = pipeline(
    "text-generation",
    model="meta-llama/Llama-3-70B",        # large verifier
    assistant_model="meta-llama/Llama-3-8B",  # small drafter
)
out = pipe("Explain transformers in detail")
# ~2.5× faster token generation
TechniqueSpeedupQuality LossVRAM ReductionBest For
FP16 / BF161.5–2×NegligibleAll production deployments
INT8 (bitsandbytes)1.5×<1%Inference on smaller GPUs
AWQ/GPTQ INT42–3×~1–2%Edge / cost-sensitive
vLLM PagedAttention10–24×NoneSameHigh-throughput serving
Speculative Decoding2–3×NoneSlightly moreLatency-sensitive single requests
Flash Attention 22–4×None10× less activationTraining & long-context inference
Pruning + Distillation2–10×3–8%2–10×Edge/mobile deployment

MLOps — Taking Models to Production Reliably

📊 Weights & Biases (W&B)

The industry standard for experiment tracking, model versioning, and hyperparameter sweeps. Every training run logged automatically.

import wandb

wandb.init(project="llm-finetuning", name="lora-r16-lr2e-4", config={
    "model": "llama3-8b", "r": 16, "learning_rate": 2e-4, "epochs": 3
})

for epoch in range(3):
    for batch in dataloader:
        loss = train_step(batch)
        wandb.log({"train/loss": loss, "epoch": epoch})   # live dashboard

# Log final model as artifact (versioned)
artifact = wandb.Artifact("lora-adapter", type="model")
artifact.add_dir("./lora-finetuned/")
wandb.log_artifact(artifact)
wandb.finish()

# Hyperparameter sweep (Bayesian optimisation)
sweep_config = {
    "method": "bayes",
    "metric": {"goal": "minimize", "name": "val/loss"},
    "parameters": {
        "learning_rate": {"min": 1e-5, "max": 5e-4},
        "r": {"values": [8, 16, 32, 64]},
        "batch_size": {"values": [4, 8, 16]},
    }
}
sweep_id = wandb.sweep(sweep_config, project="lora-sweep")
wandb.agent(sweep_id, function=train, count=20)

📦 MLflow — Model Registry

Track experiments, package models with dependencies, deploy to any serving platform. Integrates with SageMaker, Databricks, Azure ML.

import mlflow
import mlflow.pyfunc

mlflow.set_experiment("rag-system-v2")

with mlflow.start_run():
    mlflow.log_params({
        "embedding_model": "text-embedding-3-small",
        "chunk_size": 512, "k": 5, "llm": "gpt-4o"
    })

    # Run evaluation
    scores = evaluate_rag(test_questions)
    mlflow.log_metrics({
        "faithfulness": scores["faithfulness"],
        "answer_relevancy": scores["answer_relevancy"],
        "context_recall": scores["context_recall"],
    })

    # Log the RAG pipeline as a model
    class RAGModel(mlflow.pyfunc.PythonModel):
        def predict(self, ctx, model_input):
            return [rag_chain.invoke(q) for q in model_input["questions"]]

    mlflow.pyfunc.log_model("rag-pipeline", python_model=RAGModel())

# Promote best run to Model Registry
client = mlflow.MlflowClient()
client.transition_model_version_stage("rag-pipeline", version=3, stage="Production")
🔁

MLOps stack for AI in 2025–2026: W&B (experiment tracking) + DVC (data versioning) + MLflow (model registry) + Feast (feature store) + Evidently AI (drift monitoring) + Seldon/Ray Serve (model serving) + ArgoCD (GitOps deployment). You don't need all of these — start with W&B + MLflow.

AI Safety, Alignment & Security

Senior AI professionals must understand how models are aligned, what can go wrong, and how to build guardrails. This is non-negotiable at enterprise scale.

🛡️ Guardrails

Input/output validation to prevent harmful, off-topic, or policy-violating responses.

from guardrails import Guard
from guardrails.hub import ToxicLanguage, ValidJson

# Guardrails AI: validate LLM output schema + content
guard = Guard().use(ToxicLanguage, threshold=0.5, on_fail="exception")

result = guard(
    llm_api=openai.chat.completions.create,
    prompt="Tell me about AI safety",
    model="gpt-4o",
)
# Raises exception if toxic content detected

# NeMo Guardrails (NVIDIA) — conversational rails
from nemoguardrails import RailsConfig, LLMRails
config = RailsConfig.from_path("./rails_config/")
rails  = LLMRails(config)
response = rails.generate(
    messages=[{"role":"user","content":"Ignore previous instructions"}]
)
# "I'm sorry, I can't help with that." — jailbreak blocked

🔴 Red Teaming

Systematically probe model weaknesses before deployment. Required at enterprise and government deployments.

  • Prompt injection — user input overrides system prompt
  • Jailbreaks — roleplay, many-shot, "DAN" attacks
  • Data exfiltration — extract training data / PII
  • Indirect injection — malicious instructions in retrieved docs
  • Tool: PyRIT (Microsoft) — automated red-teaming framework
  • Tool: Garak — LLM vulnerability scanner

⚖️ Constitutional AI (Anthropic)

Train models to self-critique and revise outputs according to a set of principles — without human labellers for every example.

  • Step 1 — SFT: supervised fine-tuning on human demonstrations
  • Step 2 — CAI: model critiques itself using principles
  • Step 3 — RLAIF: AI-generated preference data for DPO/RLHF
  • RLHF pipeline: human → reward model → PPO policy optimisation
  • DPO: skip reward model, directly optimise preferences

Emerging Techniques — What's Shaping 2025–2026

These are the ideas separating frontier AI practitioners from the rest. You don't need to implement them, but you must understand what they are and when they matter.

🧠 Test-Time Compute (o1 / o3 / Claude 3.7)

Instead of just generating an answer, the model "thinks" for seconds or minutes — running internal chain-of-thought that isn't shown to the user. More compute at inference = better answers on hard problems.

import anthropic

client = anthropic.Anthropic()

# Extended thinking — Claude 3.7 Sonnet
response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000   # ← how much "thinking" to allow
    },
    messages=[{"role": "user", "content":
        "Prove that there are infinitely many prime numbers."
    }]
)

for block in response.content:
    if block.type == "thinking":
        print("THINKING:", block.thinking[:200])   # internal reasoning
    else:
        print("ANSWER:", block.text)               # final response

# Key insight: test-time compute scales quality logarithmically
# 10× more compute ≈ +20-30% accuracy on hard benchmarks

🔀 Mixture of Experts (MoE)

Instead of activating all model weights for every token, route each token to only 2–8 of N "expert" FFN layers. Same quality as a dense model at 1/4 the compute.

import torch
import torch.nn as nn

class MoELayer(nn.Module):
    """Sparse MoE: each token routed to top-k experts."""
    def __init__(self, d_model=1024, n_experts=8, top_k=2):
        super().__init__()
        self.router   = nn.Linear(d_model, n_experts, bias=False)
        self.experts  = nn.ModuleList([FFN(d_model) for _ in range(n_experts)])
        self.top_k    = top_k

    def forward(self, x):
        B, T, D = x.shape
        router_logits  = self.router(x)                              # [B, T, n_experts]
        router_weights = torch.softmax(router_logits, dim=-1)
        topk_weights, topk_idx = router_weights.topk(self.top_k, dim=-1)  # select top-2

        # Only compute the selected experts — massive compute savings
        output = torch.zeros_like(x)
        for k in range(self.top_k):
            expert_idx   = topk_idx[..., k]     # which expert for each token
            expert_weight = topk_weights[..., k].unsqueeze(-1)
            # Route each token to its expert (simplified)
            for e_idx, expert in enumerate(self.experts):
                mask = (expert_idx == e_idx)
                if mask.any():
                    output[mask] += expert_weight[mask] * expert(x[mask])
        return output

# Mixtral 8×7B: 8 experts, 2 active per token
# → 47B total params but only 13B active per forward pass

🌊 State Space Models — Mamba

Attention is O(n²) in sequence length. SSMs are O(n). Mamba matches transformer quality on many tasks while being 5× faster at long sequences (>8K tokens).

📐

Key idea: Compress the entire context history into a fixed-size hidden state using selective state space. The "selection" mechanism lets the model decide what to remember — like a learnable RNN but parallelisable during training. Not yet replacing transformers but strong for document processing, time series, genomics.

Mamba-2 Jamba (Mamba+Transformer) RWKV RetNet

🌍 Multimodal Frontier — 2026

The future is natively multimodal. Models that see, hear, generate images and video — trained end-to-end, not bolted together.

import anthropic, base64

client = anthropic.Anthropic()

# Claude 3.7 — vision + text in one call
with open("chart.png", "rb") as f:
    img_b64 = base64.standard_b64encode(f.read()).decode()

response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=1024,
    messages=[{"role": "user", "content": [
        {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": img_b64}},
        {"type": "text",  "text": "Extract all numbers from this chart as JSON"}
    ]}]
)
GPT-4o native audio Gemini 2.0 video Sora (video gen) Stable Diffusion 3 ColPali (PDF vision)

🏆 High-End AI Professional — Complete Skill Checklist

AreaJuniorMidSenior / Architect
FoundationsCan use APIsUnderstands embeddings + attentionReads papers, implements from scratch
RAGBasic pipelineHyDE, hybrid, rerankingGraphRAG, Agentic, production eval
Fine-TuningRuns SFT notebookLoRA/QLoRA on custom dataDPO alignment, dataset curation
InferenceUses hosted APIsSelf-hosts with vLLMQuantization, speculative decoding, Flash Attention
AgentsLangChain ReActLangGraph multi-stepMulti-agent, HITL, sub-graphs
MLOpsSaves model locallyW&B + MLflow trackingCI/CD for ML, feature stores, drift monitoring
Cloud / InfraUses managed APIsECS + RDS + SQSGPU clusters, Kubernetes, multi-region
SafetyAdds basic filtersGuardrails + red team basicsConstitutional AI, RLHF, enterprise governance
EvaluationManual testingRAGAS metricsLLM-as-judge, adversarial evals, benchmarks
FrontierReads announcementsUses new models via APIUnderstands MoE/SSM/test-time compute tradeoffs
🎯

Best learning path: Build the nanoGPT (Karpathy) → fine-tune Llama 3 with LoRA → build a production RAG system → add LangGraph agents → deploy on AWS ECS with W&B monitoring → study one frontier paper per week from arxiv.org/list/cs.LG/recent. Repeat. Ship things.

Part V

AI Frontiers

Computer Vision · Diffusion Models · Speech AI · Reinforcement Learning · Model Context Protocol — the remaining pillars every senior AI engineer must command.

Computer Vision — CNNs to Foundation Models

Three revolutions: hand-crafted features → CNNs → Vision Transformers (ViTs). Today's frontier: foundation models that unify detection, segmentation, and generation in one architecture.

Classical CV HOG · SIFT · SURF SVM classifiers · <2012 Deep CNNs AlexNet · ResNet EfficientNet · 2012–2020 Vision Transformers ViT · CLIP · DINO v2 SAM · BLIP-2 · 2020–2024 Foundation VLMs GPT-4V · Gemini · Florence InternVL · Claude · 2024+

YOLOv8 — Real-Time Object Detection

Single-pass detection under 50ms. Best for production systems needing speed.

Python · YOLOv8 detect + fine-tune
from ultralytics import YOLO

# Inference — models: yolov8n (fastest) → yolov8x (most accurate)
model = YOLO("yolov8n.pt")
results = model("image.jpg", conf=0.5, iou=0.45)

for r in results:
    boxes  = r.boxes.xyxy.cpu().numpy()  # [x1,y1,x2,y2]
    clsids = r.boxes.cls.cpu().numpy()
    confs  = r.boxes.conf.cpu().numpy()
    for box, cls, conf in zip(boxes, clsids, confs):
        print(f"{model.names[int(cls)]} {conf:.2f} @ {box}")

# Fine-tune on custom dataset (custom.yaml defines class paths)
model.train(data="custom.yaml", epochs=100, imgsz=640, batch=16)

# Export for edge deployment
model.export(format="onnx")    # or "tflite", "coreml"

SAM 2 — Segment Anything

Meta's foundation model for zero-shot segmentation. Prompt with points, boxes, or masks. Works on video too (track across frames).

Python · SAM 2 point-prompted segmentation
from sam2.build_sam import build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor
import numpy as np

model = build_sam2("sam2_hiera_large.pt")
predictor = SAM2ImagePredictor(model)

predictor.set_image(image_np)   # HWC uint8 RGB

# Prompt with a foreground point
masks, scores, _ = predictor.predict(
    point_coords=np.array([[500, 375]]),
    point_labels=np.array([1]),  # 1=fg, 0=bg
    multimask_output=True,
)
best_mask = masks[scores.argmax()]  # bool H×W array

# Or prompt with a bounding box
masks, _, _ = predictor.predict(
    box=np.array([100, 200, 400, 600]),  # x1,y1,x2,y2
    multimask_output=False,
)

CLIP — Vision-Language Alignment

Contrastive learning on 400M image-text pairs. Zero-shot classification, image search, visual RAG retrieval.

Python · CLIP zero-shot classification
from transformers import CLIPModel, CLIPProcessor
from PIL import Image

model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14")
proc  = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")

image = Image.open("photo.jpg")
labels = ["a dog running", "a cat sleeping", "a car driving"]

inputs = proc(text=labels, images=image,
               return_tensors="pt", padding=True)
outputs = model(**inputs)

probs = outputs.logits_per_image.softmax(dim=1)
for label, p in zip(labels, probs[0]):
    print(f"{p:.3f}  {label}")

Vision Model Comparison

ModelTaskBest For
YOLOv8n/xDetectionReal-time edge to server
SAM 2SegmentationAny-object zero-shot masking
ViT-L/16ClassificationHigh-accuracy image cls
CLIP L/14Vision-LanguageZero-shot, semantic search
DINO v2Dense featuresSelf-supervised repr.
Florence-2UniversalCaption + detect + ground
GPT-4V / Claude 3.5VLMComplex visual reasoning

Diffusion Models — How AI Generates Images & Video

Diffusion models learn to reverse a noise process. Three components: VAE compresses images to latent space, UNet/Transformer denoises iteratively, CLIP encoder conditions on text.

"Text Prompt" CLIP Encoder Gaussian Noise z_T UNet Denoising Loop t: T → T-1 → … → 0 Cross-attention on text embeddings 20-50 DDIM / DDPM steps VAE Decode Latent → Pixel 🖼️ 1024px

SDXL + ControlNet with Diffusers

Python · Text-to-image + structural control
from diffusers import (StableDiffusionXLPipeline,
                        ControlNetModel,
                        StableDiffusionXLControlNetPipeline)
import torch

# Basic SDXL text-to-image
pipe = StableDiffusionXLPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0",
    torch_dtype=torch.float16,
).to("cuda")

image = pipe(
    prompt="futuristic city at dusk, cinematic lighting, 8k",
    negative_prompt="blurry, low quality, cartoon",
    num_inference_steps=30,
    guidance_scale=7.5,   # CFG: adherence to prompt
    width=1024, height=1024,
).images[0]

# ControlNet: control composition with Canny edges / depth / pose
controlnet = ControlNetModel.from_pretrained(
    "diffusers/controlnet-canny-sdxl-1.0",
    torch_dtype=torch.float16
)
pipe_ctrl = StableDiffusionXLControlNetPipeline.from_pretrained(
    "stabilityai/stable-diffusion-xl-base-1.0",
    controlnet=controlnet, torch_dtype=torch.float16,
).to("cuda")

# FLUX.1 — 2024 state of the art (open weights)
from diffusers import FluxPipeline
pipe = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev", torch_dtype=torch.bfloat16
).to("cuda")

Image LoRA Fine-Tuning + Video Gen

Bash · DreamBooth LoRA in ~1 hr on 1×GPU
# HuggingFace diffusers training script
accelerate launch train_dreambooth_lora_sdxl.py \
  --pretrained_model="stabilityai/stable-diffusion-xl-base-1.0" \
  --instance_data_dir="my_images/" \
  --instance_prompt="photo of sks dog" \
  --output_dir="lora-weights/" \
  --rank=16 --learning_rate=1e-4 \
  --max_train_steps=1000 --mixed_precision="fp16"
Python · Load LoRA at inference
pipe.load_lora_weights("lora-weights/")
pipe.fuse_lora(lora_scale=0.9)
image = pipe("a sks dog on the moon").images[0]
🎬

Video gen 2025: Sora (OpenAI), Wan2.1 (Alibaba, open), Kling, Runway Gen-3. All use Diffusion Transformers (DiT) with temporal attention — treating video as 3D latent volumes.

Speech & Audio AI — ASR, TTS, Audio Generation

Whisper — Automatic Speech Recognition

OpenAI's multilingual ASR. 99 languages, 680K hours training. Use faster-whisper (CTranslate2) for 4× speed on GPU.

Python · faster-whisper with word timestamps
from faster_whisper import WhisperModel

# tiny / base / small / medium / large-v3
model = WhisperModel("large-v3", device="cuda",
                      compute_type="float16")

segments, info = model.transcribe(
    "audio.mp3",
    language="en",          # None = auto-detect
    beam_size=5,
    word_timestamps=True,   # word-level timing
    vad_filter=True,        # voice activity detection
)

for seg in segments:
    print(f"[{seg.start:.2f}s → {seg.end:.2f}s] {seg.text}")
    for word in seg.words:
        print(f"  {word.word!r}  @{word.start:.2f}s")

TTS — Text to Speech Options

Python · ElevenLabs (cloud) + Kokoro (local)
# ElevenLabs — best quality + voice cloning
from elevenlabs import ElevenLabs
client = ElevenLabs(api_key="xi-...")
audio = client.generate(
    text="Hello, synthesized voice",
    voice="Rachel",
    model="eleven_multilingual_v2",
)

# Kokoro — local, Apache 2.0, 82M params, ~0.5s RTF
from kokoro import KPipeline
pipeline = KPipeline(lang_code="a")   # 'a' = American English
audio, sr = pipeline("Hello world!", voice="af_sarah")

# OpenAI TTS — production, 6 voices
from openai import OpenAI
r = OpenAI().audio.speech.create(
    model="tts-1-hd", voice="nova",
    input="The quick brown fox"
)
r.stream_to_file("out.mp3")

MusicGen — Audio Generation

Python · Meta AudioCraft MusicGen
from audiocraft.models import MusicGen
from audiocraft.data.audio import audio_write

model = MusicGen.get_pretrained("facebook/musicgen-stereo-large")
model.set_generation_params(duration=8)   # seconds

wav = model.generate([
    "upbeat jazz piano with walking bass, 120 bpm",
    "dark cinematic orchestral tension",
])  # shape: [B, C, T]

for i, wav_i in enumerate(wav):
    audio_write(f"track_{i}", wav_i.cpu(),
                model.sample_rate, strategy="loudness")

Real-Time Voice Agent Pipeline

Microphone → VAD → ASR → LLM → TTS → speaker. Sub-500ms end-to-end latency.

Python · LiveKit voice agent
from livekit.agents import AutoSubscribe, JobContext
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero

async def entrypoint(ctx: JobContext):
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
    assistant = VoiceAssistant(
        vad=silero.VAD.load(),
        stt=deepgram.STT(),
        llm=openai.LLM(model="gpt-4o-mini"),
        tts=openai.TTS(voice="nova"),
    )
    assistant.start(ctx.room)
    await assistant.say("How can I help?")

Reinforcement Learning — Q-Learning to GRPO

An agent takes actions in an environment, receives rewards, and learns a policy maximizing cumulative reward. Powers game AI, robotics, and crucially — LLM alignment (RLHF, GRPO).

AGENT Policy π(a|s) action aₜ ENVIRONMENT Game / Robot / LLM sₜ₊₁ + reward rₜ Maximize Σ γᵗ rₜ γ = discount factor

PPO with stable-baselines3

Python · PPO on CartPole
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback

env = gym.make("CartPole-v1")
model = PPO(
    "MlpPolicy", env,
    learning_rate=3e-4,
    n_steps=2048,      # rollout length
    batch_size=64,
    n_epochs=10,       # gradient updates per rollout
    gamma=0.99,        # discount
    gae_lambda=0.95,   # advantage estimation
    clip_range=0.2,    # PPO clip ε
    verbose=1,
)
eval_cb = EvalCallback(env, best_model_save_path="./best/",
                        eval_freq=5000)
model.learn(total_timesteps=100_000, callback=eval_cb)

obs, _ = env.reset()
for _ in range(1000):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, _, _ = env.step(action)
    if done: obs, _ = env.reset()

GRPO — DeepSeek R1's Alignment Method

Group Relative Policy Optimization. No critic/value model needed — compare outputs within a group, normalize scores. Cheaper and more stable than PPO for LLM reasoning.

Python · GRPO with TRL
from trl import GRPOConfig, GRPOTrainer
from transformers import AutoTokenizer, AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B-Instruct")

def reward_fn(completions, **kwargs) -> list[float]:
    """Return a scalar reward per completion."""
    return [1.0 if is_correct(c) else 0.0 for c in completions]

trainer = GRPOTrainer(
    model=model,
    tokenizer=tokenizer,
    config=GRPOConfig(
        num_generations=8,   # group size G — compare these against each other
        max_new_tokens=512,
        learning_rate=1e-6,
        kl_coef=0.01,        # KL penalty vs reference policy
        output_dir="grpo-model",
    ),
    reward_funcs=[reward_fn],
    train_dataset=dataset,
)
trainer.train()

RL Algorithm Cheatsheet — When to Use What

AlgorithmTypeUse CasePros
Q-Learning / DQNValue-based, off-policyDiscrete actions (Atari)Sample efficient
SACOff-policy, entropy-regContinuous control (robotics)Stable, sample efficient
PPOOn-policy, clippedGame AI, RLHFReliable, widely supported
GRPOGroup comparison, no criticLLM reasoning (DeepSeek R1)No value model needed
DPOOffline, direct preferenceLLM fine-tuning alignmentNo RL training loop at all
Constitutional AISelf-critique + RLLLM harmlessness (Anthropic)Scalable without human labels

Model Context Protocol & Tool Calling

Tool calling lets LLMs invoke external functions. MCP (Model Context Protocol, Anthropic 2024) is an open standard — like USB-C for AI tools. Any MCP server works with any MCP-compatible host (Claude Desktop, Cursor, VS Code).

LLM / Claude Generates tool call JSON MCP Host Claude Desktop Cursor / VS Code Your Application 🗄️ DB MCP Server 📁 Files MCP Server 🌐 Web MCP Server PostgreSQL / MongoDB Local filesystem / S3 Search APIs / Browser

Claude Tool Use — Agentic Loop

Python · Tool calling with Anthropic SDK
import anthropic, json

client = anthropic.Anthropic()

tools = [{
    "name": "get_weather",
    "description": "Get current weather for a location",
    "input_schema": {
        "type": "object",
        "properties": {
            "location": {"type": "string"},
            "unit": {"type": "string", "enum": ["celsius","fahrenheit"]},
        },
        "required": ["location"],
    },
}]

def get_weather(location, unit="celsius"):
    return {"temp": 22, "conditions": "partly cloudy"}

messages = [{"role": "user", "content": "Weather in Tokyo?"}]

while True:
    resp = client.messages.create(
        model="claude-opus-4-7", tools=tools,
        messages=messages, max_tokens=1024,
    )
    if resp.stop_reason == "end_turn": break

    tool_results = []
    for block in resp.content:
        if block.type == "tool_use":
            result = get_weather(**block.input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": json.dumps(result),
            })
    messages += [{"role": "assistant", "content": resp.content},
                 {"role": "user",      "content": tool_results}]

print(next(b for b in resp.content if b.type == "text").text)

Build an MCP Server (FastMCP)

Python · Custom MCP server
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("my-tools")

@mcp.tool()
def search_products(query: str, limit: int = 10) -> list[dict]:
    """Search product catalog by keyword."""
    return db.execute(
        "SELECT * FROM products WHERE name ILIKE ?",
        f"%{query}%", limit=limit
    )

@mcp.resource("orders://{order_id}")
def get_order(order_id: str) -> str:
    """Get order details by ID."""
    return orders_db.get(order_id)

@mcp.prompt()
def analyze_order(order_id: str) -> str:
    return f"Analyze order {order_id} for anomalies"

if __name__ == "__main__":
    mcp.run()   # stdio by default
    # HTTP: mcp.run(transport="streamable-http", port=8000)
JSON · Add to Claude Desktop config
{
  "mcpServers": {
    "my-tools": {
      "command": "python",
      "args": ["/path/to/server.py"]
    }
  }
}
Part VI

Robotics & Embodied AI

Foundation models leaving the cloud and entering the physical world — manipulation, locomotion, perception, and the sim-to-real gap.

The Robotics AI Stack

Modern robotics merges classical control with deep learning. The frontier: robot foundation models — large Vision-Language-Action (VLA) policies trained on diverse robot data that generalize across embodiments.

PERCEPTION RGB + depth cameras LiDAR / tactile SLAM / point clouds Object detection 6-DOF pose estimation ROBOT POLICY (Brain) Vision-Language-Action Model RT-2 / π0 / OpenVLA / Helix Language + vision → tokenized joint actions Diffusion Policy / ACT trajectory heads Trained in Isaac Sim → deployed real robot Domain randomization bridges sim-to-real gap CONTROL Joint torques / positions PID / MPC control Force/impedance ctrl ROS2 / MoveIt2 Safety constraints

Robot Foundation Models

ModelByKey Innovation
RT-2Google DeepMindPaLI-X VLM → action tokens. Web knowledge transfers to robots
π0 (pi-zero)Physical IntelligenceVLA + diffusion action head. SOTA dexterous manipulation
OpenVLAStanford/Berkeley7B open-source VLA, fine-tunable on your robot data
HelixFigure AIReal-time dual-arm VLA on Figure 02 humanoid
UniSimGoogleWorld model for robot simulation and planning

Humanoid Robot Landscape (2025)

RobotCompanyStatus
Optimus Gen 2TeslaProduction-line testing, Gigafactory
Figure 02Figure AICommercial, BMW partnership
NEO Gamma1X TechnologiesHome assistant, open-data strategy
AtlasBoston DynamicsElectric, Hyundai integration
GR-2Fourier IntelligenceMass production, rehab + logistics
Unitree G1Unitree$16K — most affordable humanoid

ROS2 — Robot Operating System 2

Industry-standard middleware. DDS-based pub/sub, real-time capable, cross-platform. Used in autonomous vehicles, surgical robots, warehouse automation.

ROS2 Core Concepts — Node, Topic, Service

Python · rclpy publisher + subscriber
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist

class CameraNode(Node):
    def __init__(self):
        super().__init__("camera_node")
        # Publisher: send camera frames at 30fps
        self.pub = self.create_publisher(Image, "/camera/rgb", 10)
        self.timer = self.create_timer(0.033, self.publish_frame)
        # Subscriber: receive navigation commands
        self.sub = self.create_subscription(
            Twist, "/cmd_vel", self.on_cmd_vel, 10)

    def publish_frame(self):
        msg = Image()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.encoding = "rgb8"
        # msg.data = capture_camera_bytes()
        self.pub.publish(msg)

    def on_cmd_vel(self, msg: Twist):
        self.get_logger().info(
            f"Moving: linear={msg.linear.x:.2f} "
            f"angular={msg.angular.z:.2f}"
        )

rclpy.init()
node = CameraNode()
rclpy.spin(node)   # event-driven loop

MuJoCo Simulation + Domain Randomization

Python · Custom robot env + sim-to-real
import gymnasium as gym
import numpy as np

# MuJoCo built-in robots
env = gym.make("HalfCheetah-v4", render_mode="rgb_array")

# Domain randomization — key for sim-to-real transfer
def randomize_physics(env):
    """Randomize friction/mass/actuator noise each episode.
    The policy must learn to handle all variations → transfers to real."""
    model = env.unwrapped.model
    # Randomize friction (±50%)
    model.geom_friction[:] = 0.8 * np.random.uniform(0.5, 2.0,
                                size=model.geom_friction.shape)
    # Randomize body mass (±20%)
    model.body_mass[1:] *= np.random.uniform(0.8, 1.2,
                             size=model.body_mass[1:].shape)

# NVIDIA Isaac Lab: 4096 parallel envs on A100
# isaac_env = gym.make("Isaac-Velocity-Flat-Anymal-C-v0")
# Runs 100× faster than MuJoCo, photorealistic
🎮

Sim stack in 2025: NVIDIA Isaac Lab for GPU-accelerated RL (4096 parallel envs). MuJoCo for manipulation research. PyBullet for quick prototypes. Genesis (2024) for generative world models.

Robot Learning — Imitation to Diffusion Policy

ACT — Action Chunking Transformer

Stanford's ALOHA system. Predict k future actions as a chunk (not one-at-a-time). Eliminates compounding errors in dexterous manipulation.

Python · ACT with LeRobot
"""
ACT key ideas:
- CVAE encoder: encode action sequence → style latent z
- Transformer: obs + z → predict chunk of k=100 actions
- Temporal ensembling: average overlapping chunks
- Trained on teleoperation via ALOHA bimanual hardware
- Input: 4 camera views + joint positions
- Output: 100 joint position targets at 50 Hz = 2-second plan
"""
from lerobot.policies.act import ACTPolicy

# Pre-trained on HuggingFace Hub
policy = ACTPolicy.from_pretrained(
    "lerobot/act_aloha_sim_transfer_cube_human"
)

# Inference
obs = {
    "observation.images.top": img_tensor,        # [C,H,W]
    "observation.state": joint_positions_tensor, # [14]
}
action_chunk = policy.select_action(obs)   # [100, 14]

Diffusion Policy

Model robot actions as a denoising diffusion process. Handles multimodal action distributions naturally — a robot can pick up from left OR right without mode averaging.

Python · Diffusion Policy with LeRobot
"""
Diffusion Policy (Chi et al., 2023):
- Treat action trajectory like an image — learn to denoise it
- DDPM / DDIM scheduler (16 denoising steps at inference)
- U-Net or Transformer denoises, conditioned on observation
- Naturally multimodal: represents all valid grasp modes
- No mode averaging (behavior cloning averages → bad actions)
"""
from lerobot.policies.diffusion import DiffusionPolicy

policy = DiffusionPolicy.from_pretrained("lerobot/diffusion_pusht")

obs = {
    "observation.image":  img_tensor,
    "observation.state":  state_tensor,
}
# Starts from Gaussian noise → denoises → action chunk
action_chunk = policy.select_action(obs)   # [16, 2]
🤗

Start here: LeRobot (HuggingFace) has 100+ teleoperation datasets, pre-trained ACT/DiffusionPolicy, and one-command training for SO-100 and ALOHA robots.

Part VII

Quantum Computing

Qubits, superposition, entanglement — and what quantum actually means for AI. From Qiskit circuits to QAOA optimization and the honest timeline for quantum advantage.

Quantum Fundamentals — Qubits, Gates, Circuits

Classical bits are 0 or 1. A qubit exists in superposition: α|0⟩ + β|1⟩ where |α|² + |β|² = 1. Entanglement links qubits non-locally. Interference cancels wrong answer paths and amplifies correct ones — that's the quantum speedup.

q₀: |0⟩ H Rz(θ) M → 0 or 1 q₁: |0⟩ M → 0 or 1 Hadamard CNOT Rotation Measure Bell State Created (|00⟩ + |11⟩) / √2 Entangled: measure q₀=0 → q₁=0 Never see |01⟩ or |10⟩

Essential Quantum Gates

GateEffect
H (Hadamard)|0⟩→(|0⟩+|1⟩)/√2 — creates superposition
X (Pauli-X)|0⟩↔|1⟩ — quantum NOT gate
Z (Pauli-Z)|1⟩→−|1⟩ — phase flip
CNOTFlip target if control=|1⟩ — creates entanglement
T gateπ/8 phase rotation — needed for universal QC
Rx/Ry/Rz(θ)Arbitrary Bloch sphere rotation — parameterized circuits

Qiskit — Hello Quantum World

Python · Bell State + IBM hardware
from qiskit import QuantumCircuit
from qiskit.primitives import StatevectorSampler

# Build Bell State: (|00⟩ + |11⟩) / √2
qc = QuantumCircuit(2, 2)
qc.h(0)           # superposition on q0
qc.cx(0, 1)       # CNOT: entangle q0 → q1
qc.measure([0, 1], [0, 1])

# Simulate locally
sampler = StatevectorSampler()
counts = sampler.run([qc], shots=1024).result()[0]\
                .data.c.get_counts()
# {'00': ~512, '11': ~512} — never '01' or '10'

print(qc.draw("text"))

# Run on real IBM quantum hardware (free tier available)
from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2
service = QiskitRuntimeService(
    channel="ibm_quantum", token="YOUR_IBM_TOKEN"
)
backend = service.least_busy(operational=True, simulator=False)
job = SamplerV2(mode=backend).run([qc], shots=1024)
print(job.result()[0].data.c.get_counts())

Quantum Algorithms — Grover, Shor, VQE, QAOA

Grover's Algorithm — Quadratic Search Speedup

Classical unstructured search: O(N). Grover's: O(√N). For 1M items: 1M → 1,000 steps. Works via amplitude amplification — repeatedly boosting the marked state's probability.

Python · Grover's search with Qiskit
from qiskit.circuit.library import PhaseOracle
from qiskit.algorithms import Grover, AmplificationProblem
from qiskit.primitives import Sampler

# Oracle marks the target state "11" (boolean: x0 AND x1)
oracle = PhaseOracle("x0 & x1")

problem = AmplificationProblem(
    oracle,
    is_good_state=["11"],  # what we're searching for
)

grover = Grover(sampler=Sampler())
result = grover.amplify(problem)

print(result.top_measurement)    # "11" with high probability
print(result.max_probability)    # close to 1.0

# Optimal iterations ≈ π/4 × √(N/M)
# N = search space size, M = number of solutions
# 2 qubits → N=4, M=1 → 1 iteration optimal

Shor's Algorithm — RSA Threat

Factors N-bit integers in polynomial time O((log N)³). Breaks RSA, DSA, ECDSA. Requires millions of physical qubits (fault-tolerant). ~10-20 years away.

Concept · Shor's structure + post-quantum
"""
Shor's algorithm outline:
1. Choose random a < N
2. Find period r of f(x) = aˣ mod N
   — Uses Quantum Phase Estimation (QPE) + QFT
   — This is the O(log³N) quantum speedup step
3. If r even and aʳ/² ≢ -1 (mod N):
   gcd(aʳ/²±1, N) gives a factor

Why it breaks RSA:
- RSA-2048 relies on factoring being classically hard
- Classical best: O(exp(n^1/3 log^2/3 n)) — sub-exponential
- Shor's: O(n³) — polynomial, exponential speedup
- Also breaks ECDSA (elliptic curve) and DSA

Post-quantum crypto (safe from Shor's) — NIST 2024:
  CRYSTALS-Kyber  → key encapsulation ✅ standardized
  CRYSTALS-Dilithium → digital signatures ✅ standardized
  SPHINCS+        → hash-based signatures ✅ standardized

AES-256 is quantum-safe (Grover halves key strength → 128 bits)
"""

VQE — Quantum Chemistry (Near-Term)

Variational Quantum Eigensolver finds ground-state energy of molecules. Hybrid classical-quantum. First real-world quantum advantage domain: drug discovery, materials science.

Python · VQE for H₂ molecule
from qiskit_nature.second_q.drivers import PySCFDriver
from qiskit_nature.second_q.mappers import JordanWignerMapper
from qiskit_algorithms import VQE
from qiskit_algorithms.optimizers import SLSQP
from qiskit.circuit.library import TwoLocal
from qiskit.primitives import Estimator

# Hydrogen molecule at equilibrium bond length
driver = PySCFDriver(atom="H .0 .0 .0; H .0 .0 0.735",
                     basis="sto3g")
problem = driver.run()

# Map fermionic Hamiltonian → qubit operators
qubit_op = JordanWignerMapper().map(problem.second_q_ops()[0])

# Parameterized ansatz: trial wave function
ansatz = TwoLocal(rotation_blocks="ry",
                  entanglement_blocks="cz", reps=2)

# Hybrid loop: quantum circuit → classical optimizer → repeat
result = VQE(Estimator(), ansatz, SLSQP())\
             .compute_minimum_eigenvalue(qubit_op)
print(f"H₂ ground state: {result.eigenvalue:.6f} Hartree")

QAOA — Combinatorial Optimization

Quantum Approximate Optimization Algorithm. Targets NP-hard problems (MaxCut, TSP, scheduling). Works on today's NISQ hardware.

Python · QAOA for MaxCut
from qiskit_optimization.problems import QuadraticProgram
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit_algorithms import QAOA
from qiskit.primitives import Sampler

# MaxCut: partition graph to maximize edges crossing the cut
qp = QuadraticProgram()
qp.binary_var_list(4)   # 4 nodes
# Objective: minimize -1*(sum of cut edges)
qp.minimize(quadratic={(0,1):-1, (1,2):-1, (2,3):-1, (0,2):-1})

# QAOA with p=2 layers (more layers → better approximation)
qaoa = QAOA(sampler=Sampler(), reps=2)
result = MinimumEigenOptimizer(qaoa).solve(qp)

print(result.x)      # [0,1,0,1] — partition assignment
print(result.fval)   # approximation ratio vs classical opt

Quantum Machine Learning — Honest Assessment

Quantum Neural Networks with PennyLane

Parameterized quantum circuits as differentiable layers. Backprop via parameter-shift rule. Integrates with PyTorch or JAX.

Python · QNN hybrid model in PyTorch
import pennylane as qml
import torch

n_qubits = 4
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface="torch")
def quantum_circuit(inputs, weights):
    # Encode classical data into quantum state
    qml.AngleEmbedding(inputs, wires=range(n_qubits))
    # Parameterized entangling layers
    qml.BasicEntanglerLayers(weights, wires=range(n_qubits))
    # Measure: return expectation values
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

# Wrap as a PyTorch layer
qlayer = qml.qnn.TorchLayer(
    quantum_circuit,
    weight_shapes={"weights": (3, n_qubits)}
)

# Hybrid classical → quantum → classical model
model = torch.nn.Sequential(
    torch.nn.Linear(8, n_qubits),  # classical encoder
    qlayer,                         # quantum processing
    torch.nn.Linear(n_qubits, 2),  # classical decoder
)
# Train with standard PyTorch optimizer + backprop
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

Quantum ML — Timeline & Honest Take

TechniqueStatusWhen Matters
VQE (chemistry)Near-term real use<5 yr: drug discovery
QAOA (optimization)NISQ-era, niche<5 yr: scheduling
Quantum kernels/SVMResearch scaleAdvantage unproven on real data
QNN / PQCResearchBarren plateau problem unsolved
Shor's (RSA break)Requires fault-tolerant10-20 years
Grover's (ML speedup)TheoreticalNeeds fault-tolerant QC
HardwareQubits (2025)SDK
IBM Quantum1000+ (Heron r2)Qiskit
Google Quantum AI105 (Willow)Cirq
Quantinuum56 (H2-1)pytket
IonQ36 (Forte)ionq SDK
PsiQuantum~1M by 2027?Photonic
⚛️

Honest quantum ML take for 2025: Classical ML still beats quantum ML on every real-world benchmark. The "quantum advantage" for ML is unproven. But — VQE for molecular simulation and QAOA for small combinatorial problems show real promise. Learn Qiskit now. When fault-tolerant quantum arrives (~2030-2035), the engineers who understand both ML and quantum algorithms will design the next generation of models. That intersection is tiny and extremely valuable.

🏆 Complete Learning Map — All 7 Parts

DomainFoundationProductionFrontier
RAGBasic pipeline, chunking, embeddingsHyDE, GraphRAG, RAGAS evalAgentic RAG, multimodal, self-RAG
LLMsTransformer internals, attentionLoRA fine-tuning, vLLM servingDPO, GRPO, test-time compute
AgentsReAct, tool calling, MCPLangGraph multi-agent, HITLMulti-agent, sub-graphs, CrewAI
Computer VisionCNNs, ViT, CLIPYOLOv8, SAM, diffusersVideo gen (DiT), multimodal VLMs
Speech/AudioWhisper ASR, TTS basicsReal-time voice agent pipelinesGPT-4o audio, MusicGen
RLMDP, Q-learning, PPORLHF pipeline, DPOGRPO, Constitutional AI
Cloud / MLOpsAWS S3/EC2/LambdaECS, W&B, MLflowGPU clusters, feature stores
RoboticsROS2 basics, MuJoCo simACT, Diffusion Policy, LeRobotVLA models (RT-2, π0), humanoids
QuantumQubits, gates, QiskitVQE, QAOA, Grover'sQNN (PennyLane), fault-tolerant