Agentic Workflows

in LlamaIndex

2025-04-21 Microsoft AI Agents Hackathon

What are we talking about?

  • What is LlamaIndex
  • What is an Agent?
  • What is RAG?
  • Common agentic Patterns
  • Building agentic RAG in LlamaIndex.TS
    • Chaining
    • Routing
    • Parallelization
    • Self-reflection
    • Full stack web app
    • ...with agents
    • ...with RAG!

What is LlamaIndex?

Python: docs.llamaindex.ai

TypeScript: ts.llamaindex.ai

LlamaParse

World's best parser of complex documents

Free for 10000 pages/month!

cloud.llamaindex.ai

LlamaCloud

Turn-key RAG API for Enterprises

Available as SaaS or private cloud deployment

Sign up at cloud.llamaindex.ai

Why LlamaIndex?

  • Build faster
  • Skip the boilerplate
  • Avoid early pitfalls
  • Get best practices for free
  • Go from prototype to production

What can you build in LlamaIndex?

  • Lots of stuff! Especially...
  • AI agents
  • RAG

What is an agent?

Semi-autonomous software

that uses tools to accomplish a goal

Agentic programming is a new paradigm

When does an agent make sense?

When your data is messy, which is most of the time

LLMs are good at turning lots of text into less text

LLMs work well under the hood

You don't have to build a chatbot

LLMs need data

The solution is RAG

RAG = infinite context

Agents need RAG

and RAG needs agents

Building Effective Agents

  • Chaining
  • Routing
  • Parallelization
  • Orchestrator-Workers
  • Evaluator-Optimizer

Chaining

Routing

Parallelization

Sectioning

Parallelization: flavor 1

Voting

Parallelization: flavor 2

Orchestrator-Workers

Evaluator-Optimizer

aka Self-reflection

Arbitrary complexity

Workflows

The building blocks of agents in LlamaIndex

Setting up

npm install tsx -g
npm install @llama-flow/core

Install:

{
  "type": "module"
}

package.json:

import { createWorkflow, workflowEvent } from "@llama-flow/core";
import { pipeline } from "node:stream/promises";	

Imports:

tsx my_file.ts

Run examples:

Define events,

create workflow

const startEvent = workflowEvent<string>();
const secondEvent = workflowEvent<string>();
const stopEvent = workflowEvent<string>();

const workflow = createWorkflow();

Create step handlers

// handle the start event
workflow.handle([startEvent], (start) => {
    console.log(`Started the workflow with input: ${start.data}`);
    return secondEvent.with(start.data);
});

// handle the second event
workflow.handle([secondEvent], (second) => {
    console.log(`Second event with input: ${second.data}`);
    return stopEvent.with(second.data);
});

Running a workflow

const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with("I am some data"));

Processing workflow output

// Process the stream to get the result
const result = await pipeline(stream, async function (source) {
    for await (const event of source) {
        console.log(`Event: ${event.data}`);
        if (stopEvent.include(event)) {
            return `Result: ${event.data}`;
        }
    }
});

console.log(result)

Routing

const startEvent = workflowEvent<string>();
const branchA1Event = workflowEvent<string>();
const branchA2Event = workflowEvent<string>();
const branchB1Event = workflowEvent<string>();
const branchB2Event = workflowEvent<string>();
const stopEvent = workflowEvent<string>();

Choosing a branch

workflow.handle([startEvent], (start) => {
  // Randomly choose between branch A and B
  if (Math.random() < 0.5) {
    return branchA1Event.with("Chose branch A");
  } else {
    return branchB1Event.with("Chose branch B");
  }
});

Two chains

// handle branch A
workflow.handle([branchA1Event], (branchA1) => {
  return branchA2Event.with(branchA1.data);
});

workflow.handle([branchA2Event], (branchA2) => {
  return stopEvent.with(branchA2.data);
});

// handle branch B
workflow.handle([branchB1Event], (branchB1) => {
  return branchB2Event.with(branchB1.data);
});

workflow.handle([branchB2Event], (branchB2) => {
  return stopEvent.with(branchB2.data);
});

Sectioning

import { collect } from "@llama-flow/core/stream/consumer";
import { until } from "@llama-flow/core/stream/until";
import { filter } from "@llama-flow/core/stream/filter";
import { getContext } from "@llama-flow/core";

Sectioning events

const startEvent = workflowEvent<string>();
const branchAEvent = workflowEvent<string>();
const branchBEvent = workflowEvent<string>();
const branchCEvent = workflowEvent<string>();
const branchCompleteEvent = workflowEvent<string>();
const allCompleteEvent = workflowEvent<string>();
const stopEvent = workflowEvent<string>();

Emitting multiple events

workflow.handle([startEvent], async (start) => {
  // emit 3 different events, handled separately
  const { sendEvent, stream } = getContext();
  sendEvent(branchAEvent.with("Branch A"));
  sendEvent(branchBEvent.with("Branch B"));
  sendEvent(branchCEvent.with("Branch C"));

Sectioning steps

workflow.handle([branchAEvent], (branchA) => {
  // do something here
  return branchCompleteEvent.with(branchA.data);
});

Collecting events

  let condition = 0;
  const results = await collect(
    until(
      filter(stream, (ev) => branchCompleteEvent.include(ev)),
      () => {
        condition++;
        return condition === 3;
      },
    ),
  );

  console.log(`All branches completed`);
  return allCompleteEvent.with(results.join(", "));
});

Voting

is implemented the same way as sectioning

Using an LLM

import { config } from 'dotenv';
config({ path: '.env.local' });

Imports for .env.local:

ANTHROPIC_API_KEY=xxxxx

In .env.local:

import { Anthropic } from "@llamaindex/anthropic";

Imports for Anthropic:

Orchestrator-worker

const startEvent = workflowEvent<string>();
const subquestionEvent = workflowEvent<string>();
const questionAnsweredEvent = workflowEvent<string>();
const synthesizeEvent = workflowEvent<string>();
const stopEvent = workflowEvent<string>();

Initialize the LLM

// initialize the LLM
const llm = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

Split up the question

let prompt = `We have been ask a complicated question: 
  <question>${start.data}</question>.
  Split it up into a few different questions 
  that are easier to answer.
  Return the questions with one question per line.
  Do not include any other text, preamble or 
  explanation in your response.
`
let result = await llm.complete({prompt:prompt})

// split up the result into an array of questions
let questions = result.text.split("\n")
  .map(q => q.trim())
  .filter(q => q !== "");

Answering each question

workflow.handle([subquestionEvent], async (subquestion) => {
  console.log(`Answering sub-question: ${subquestion.data}`);

  let prompt = `Answer the question: 
    <question>${subquestion.data}</question>.
    Return the answer as a short answer.
  `
  let result = await llm.complete({prompt:prompt})
  console.log(`Answer: ${result.text}`);
  return questionAnsweredEvent.with(result.text)
});

Emitting and collecting

// emit the questions
const { sendEvent, stream } = getContext();
for (let question of questions) {
  sendEvent(subquestionEvent.with(question));
}

// get all the answers to the questions
let condition = 0;
const results = await collect(
  until(
    filter(stream, (ev) => questionAnsweredEvent.include(ev)),
    () => {
      condition++;
      return condition === questions.length;
    },
  ),
);

Bundling answers

let answers = results.map(r => r.data)
return synthesizeEvent.with({
  answers: answers.join("\n"),
  question: start.data
});

Synthesis

// handle the collected results
workflow.handle([synthesizeEvent], async (synthesize) => {

  let prompt = `You were given the complicated question
    ${synthesize.data.question}. 
    We split it into multiple simpler questions. 
    The answers to the questions are: 
    <answers>${synthesize.data.answers}</answers>.`
  
  let result = await llm.complete({prompt:prompt})  
  return stopEvent.with(result.text);
});

Evaluator-optimizer

aka Self-reflection

Looping syntax

workflow.handle([stepCEvent], async (stepC) => {
  console.log("Step C, data is ", stepC.data)
  if (stepC.data.counter < 10) {
    console.log("Need to loop")
    return stepAEvent.with(stepC.data);
  } else {
    return stopEvent.with(stepC.data);
  }
});

Real-world web apps

Setting up

npx create-next-app

Create next.js app:

In repo:

7_browser/src/app/page.tsx

Basic React app

const [updates, setUpdates] = useState([]);
const [isComplete, setIsComplete] = useState(false);

...

return (
  <div>
    <h2>Streaming Updates</h2>
    <ul>
      {updates.map((update, i) => (
        <li key={i}>{update}</li>
      ))}
    </ul>
    {isComplete && <div>Process complete!</div>}
  </div>
);

In-browser workflow

const startEvent = workflowEvent<void>();
const updateEvent = workflowEvent<string>();
const completeEvent = workflowEvent<void>();

const workflow = createWorkflow();	

Synthetic events

// Simulate async updates
const intervals = [
  setTimeout(() => sendEvent(updateEvent.with("First update")), 500),
  setTimeout(() => sendEvent(updateEvent.with("Second update")), 1000),
  setTimeout(() => sendEvent(updateEvent.with("Final update")), 1500),
  setTimeout(() => sendEvent(completeEvent.with()), 2000)
];

Handling streams in browser

// Process events
const processEvents = async () => {
  for await (const event of stream) {
    if (updateEvent.include(event)) {
      setUpdates(prev => [...prev, event.data]);
    } else if (completeEvent.include(event)) {
      setIsComplete(true);
      break;
    }
  }
};

Server-side full-stack React app

In repo:

8_server/src/app/page.tsx
8_server/src/app/api/run-workflow/route.ts

 

Client-side code

useEffect(() => {
  const eventSource = new EventSource('/api/run-workflow');

  eventSource.onmessage = (event) => {
    console.log("Got a new event", event.data)
    setEvents(prev => [...prev, event.data]);
  };

  eventSource.onerror = () => {
    eventSource.close();
  };

  return () => {
    eventSource.close();
  };
}, []);

Endpoint route

export async function GET() {
  const encoder = new TextEncoder();
  const stream = new ReadableStream({
    async start(controller) {
		// everything happens here
    },
  });

  return new NextResponse(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
} 

Event sources

// Send initial connection message
controller.enqueue(
  encoder.encode('data: Connected to workflow\n\n')
);

// initialize the anthropic client
const llm = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

Architecture

Events

const startEvent = workflowEvent<{
  topic:string|null,
  research:string,
  feedback:string|null
}>();
const writeEvent = workflowEvent<string>();
const reviewEvent = workflowEvent<string>();
const stopEvent = workflowEvent<string>();

Context store

import { withStore } from "@llama-flow/core/middleware/store";

const workflow = withStore(()=>({
  topic: null as string | null,
  research: "" as string,
  feedback: null as string | null
}),createWorkflow());

Researching

workflow.handle([startEvent], async (start) => {
  // store the topic in case we need to loop
  workflow.getStore().topic = start.data.topic
  let prompt = `What do you know about the topic 
    <topic>${start.data.topic}</topic>? Be very brief.`

  // if we've previously looped, include the feedback
  if (start.data.feedback) {
    prompt += `\nYou have researched this topic before. 
       The feedback you got to research it more is: 
       <feedback>${start.data.feedback}</feedback>`
  }

  // get the research
  let result = await llm.complete({prompt:prompt})
  return writeEvent.with(result.text)
});

Writing

// with the research, store it and try writing a post
workflow.handle([writeEvent], async (write) => {
  console.log(`Writing post, data is ${write.data}`)

  // store the research; this may not be the first time around
  workflow.getStore().research += `<research>${write.data}</research>`

  // try writing a post
  let result = await llm.complete({prompt:`We have done some 
    research about a topic. Write a post about it. Use ONLY 
    the research data provided, nothing from your training 
    data, even if that results in a very short post. 
    ${workflow.getStore().research}`})
  return reviewEvent.with(result.text)
});

Reflecting

// review the post
let result = await llm.complete({
  prompt:`We have written a post about a topic. 
  Review it for any errors or missing information. 
  <post>${review.data}</post>. If it is good, return 
  the string "good". If it requires more research, 
  return some feedback on what to research next.`})

// if the post is good, stop
if (result.text === "good") {
  return stopEvent.with(review.data)
} else {
  // otherwise, loop with the feedback
  return startEvent.with({
    topic: workflow.getStore().topic,
    research: workflow.getStore().research,
    feedback: result.text
  })
}

Running the workflow

// Process the stream to get the result
pipeline(stream, async function (source) {
  for await (const event of source) {
    console.log("Got a new event", event.data)
    controller.enqueue(
      encoder.encode("data: " + JSON.stringify(event.data)+"\n\n"));  
    if (stopEvent.include(event)) {
      controller.close();
    }
  }
});

Recap!

  • Agentic patterns
  • Implemented as Workflows
  • In a browser
  • On a server

Adding agents

Wait, weren't we building agents already?

Setting up

npm install llamaindex

Install:

New imports:

import {
    agent,
    tool,
    Settings,
} from "llamaindex";
import { z } from "zod";

Creating a tool

const sumNumbers = ({ a, b }) => {
    return `${a + b}`;
};

Preparing a tool

const addTool = tool({
    name: "sumNumbers",
    description: "Use this function to sum two numbers",
    parameters: z.object({
        a: z.number({
            description: "First number to sum",
        }),
        b: z.number({
            description: "Second number to sum",
        }),
    }),
    execute: sumNumbers,
});

Instantiate an agent

Settings.llm = new Anthropic({
    apiKey: process.env.ANTHROPIC_API_KEY,
    model: "claude-3-7-sonnet-latest",
});
const tools = [addTool];
const myAgent = agent({ tools });

Agents in a workflow

// handle the start event
workflow.handle([startEvent], async (start) => {
    console.log(`Started the workflow 
      with question: ${start.data}`);

    const response = await myAgent.run(start.data);
    return stopEvent.with(response.data.result);
});

Basic RAG pipeline

Setting up

npm install @llamaindex/huggingface 
npm install @llamaindex/readers

Install:

New imports:

import VectorStoreIndex from "llamaindex";
import { HuggingFaceEmbedding } from "@llamaindex/huggingface";
import { SimpleDirectoryReader } from "@llamaindex/readers/directory";

Loading

const reader = new SimpleDirectoryReader();
const documents = await reader.loadData("./data");

Embedding

Settings.embedModel = new HuggingFaceEmbedding({
    modelType: "BAAI/bge-small-en-v1.5",
    quantized: false,
});

Storing

const index = await VectorStoreIndex.fromDocuments(documents);

Retrieving

const retriever = await index.asRetriever();
retriever.similarityTopK = 10;

Creating a query tool

const queryTool = index.queryTool({
    metadata: {
        name: "san_francisco_budget_tool",
        description: `This tool can answer detailed questions 
          about the individual components of the budget of 
          San Francisco in 2023-2024.`,
    },
    retriever: retriever,
})

Giving RAG to an agent

const tools = [addTool, queryTool];
const myAgent = agent({ tools });

Wow!

What can't it do? Well...

RAG limitations

Summarization

Solve it with: routing, parallelization

Comparison

Solve it with: parallelization

Multi-part questions

Solve it with: chaining, parallelization

What's next?

More workflow docs:

Thanks!

Follow me on BlueSky:

@seldo.com

Knowledge-augmented agents with LlamaIndex.TS

By Laurie Voss

Knowledge-augmented agents with LlamaIndex.TS

  • 118