Overview

The Graph feature in Upsonic allows you to create complex workflows by connecting multiple AI Tasks together. Tasks can be executed sequentially or in parallel, with outputs from one task flowing into subsequent tasks. This enables building sophisticated AI pipelines where each task builds upon the results of previous ones.

Creating a Graph

A Graph is the main structure that manages task execution and workflow. It handles task scheduling, execution order, and state management between tasks.

from upsonic import Agent, Task
from upsonic import Graph

# Create a default agent for the graph
agent = Agent("Test Agent")

# Create a graph with the agent as default
graph = Graph(default_agent=agent)

You can configure a graph with several options:

  • default_agent: The default agent to use for tasks that don’t specify their own
  • parallel_execution: Whether to execute independent tasks in parallel
  • max_parallel_tasks: Maximum number of tasks to execute in parallel (default: 4)
  • show_progress: Whether to display a progress bar during execution (default: True)

Creating Task Nodes

Task nodes are the building blocks of a graph. Each node contains a Task object and connects to other nodes to form a workflow.

# Create tasks
task1 = Task("Analyze market trends")
task2 = Task("Generate product recommendations")
task3 = Task("Summarize findings")

# Add tasks to the graph using the chain operator
graph.add(task1 >> task2 >> task3)

# Alternatively, add tasks individually
graph.add(task1)
graph.add(task2)
graph.add(task3)

Note: Tasks in a graph work exactly the same as regular Tasks in Upsonic. You can provide prompts, tools, and context just as you would with standalone tasks.

Using Direct in Graph

The Direct interface provides a simpler way to interact with AI models without creating a full Agent configuration.

filename
from upsonic import Direct, Task
from upsonic import Graph

# Create a Direct interface to a model
direct = Direct()

# Create a graph with the Direct interface
graph = Graph(default_agent=direct)

# Create and connect tasks
question1 = Task("What is artificial intelligence?")
question2 = Task("How does it impact business operations?")
question3 = Task("Summarize the key takeaways")

# Add the task chain to the graph
graph.add(question1 >> question2 >> question3)

# Run the graph
graph.run()
print(graph.get_output())

Running a Graph

Once you’ve created a graph and added tasks, you can run the graph to execute all tasks in the proper order.

# Run the graph with default settings
result = graph.run()

# Run with verbose output to see detailed information
result = graph.run(verbose=True)

# Get the final output (result of the last task)
final_output = graph.get_output()

# Get output of a specific task by description
specific_output = graph.get_task_output("Analyze market trends")

When tasks are executed, outputs from each task are stored in the graph’s state and can be accessed by subsequent tasks. This allows for complex workflows where later tasks build upon the results of earlier ones.

Creating Decision Nodes

Decision nodes allow you to create conditional branches in your graph, enabling dynamic workflows that can take different paths based on the output of previous tasks. Upsonic provides two types of decision nodes:

Decision via Code (DecisionFunc)

The DecisionFunc allows you to use a custom Python function to determine which path to take in your graph based on the output of previous tasks.

from upsonic import Agent, Task
from upsonic import Graph, DecisionFunc

# Create an agent and graph
agent = Agent("Test Agent")
graph = Graph(default_agent=agent)

# Create tasks
country_task = Task("What's an interesting country in Central Asia?")
geography_task = Task("What is the geography of this country?")
culture_task = Task("What is the culture of this country?")
mountain_task = Task("What is the most popular mountain in this country?")

# Define a decision function
def has_mountains(output):
    return "mountain" in output.lower()

# Create a decision node
decision = DecisionFunc("Has mountains?", has_mountains)

# Add tasks with conditional branching to the graph
graph.add(country_task >> geography_task >> decision.if_true(mountain_task).if_false(culture_task))

In this example, after getting information about a country’s geography, the decision function checks if the word “mountain” appears in the output. If it does, the graph will execute the mountain task; otherwise, it will execute the culture task.

Decision via LLM (DecisionLLM)

The DecisionLLM uses an AI model to make decisions based on the output of previous tasks, allowing for more complex decision-making without writing custom code.

from upsonic import Agent, Task
from upsonic import Graph, DecisionLLM

# Create an agent and graph
agent = Agent("Test Agent")
graph = Graph(default_agent=agent)

# Create tasks
country_task = Task("What's an interesting country which has the biggest mountains?")
geography_task = Task("What is the geography of this country?")
culture_task = Task("What is the culture of this country?")
mountain_task = Task("What is the most popular mountain in this country?")

# Create a decision node using LLM
decision = DecisionLLM("Has the biggest trains?")

# Add tasks with conditional branching to the graph
graph.add(country_task >> geography_task >> decision.if_true(mountain_task).if_false(culture_task))

In this example, the LLM will analyze the output from the geography task and determine whether the country has “the biggest trains” based on the content. The decision will then direct the flow to either the mountain task or the culture task.

Decision nodes provide powerful flexibility in creating dynamic workflows that can adapt based on the content and context of previous task outputs.

Running a Graph

Once you’ve created a graph and added tasks, you can run the graph to execute all tasks in the proper order.

# Run the graph with default settings
result = graph.run()

# Run with verbose output to see detailed information
result = graph.run(verbose=True)

# Get the final output (result of the last task)
final_output = graph.get_output()

# Get output of a specific task by description
specific_output = graph.get_task_output("Analyze market trends")

When tasks are executed, outputs from each task are stored in the graph’s state and can be accessed by subsequent tasks. This allows for complex workflows where later tasks build upon the results of earlier ones.