Skip to main content

Goals

  • Understand when and where to use the asynchronous client features
  • Use the async client to efficiently serve environments that use sandboxes
  • Use the async client to run multiple agents in parallel

Prerequisites

Basics

Previously, we’ve seen basic examples using the synchronous client. For example, interacting with an environment:
from openreward import OpenReward
client = OpenReward()

env = client.environments.get("GeneralReasoning/counter")
tasks = env.list_tasks(split="train")

with env.session(tasks[0]) as session:
    # your agent logic here
    result = session.call_tool("increment", {"amount": 1})
This simple synchronous interface is problematic when we need to interact with openreward features concurrently, such as when running an evaluation over a suite of environments and tasks. To remedy this, an asynchronous version of the OpenReward client is provided, called AsyncOpenReward. Using AsyncOpenReward, the above example would become
import asyncio
from openreward import AsyncOpenReward
client = AsyncOpenReward()

async def main():
    env = client.environments.get("GeneralReasoning/counter")
    tasks = await env.list_tasks(split="train")

    async with env.session(tasks[0]) as session:
        # your agent logic here
        result = await session.call_tool("increment", {"amount": 1})

if __name__ == "__main__":
    asyncio.run(main())
Currently, code using AsyncOpenReward is required to be run inside of
if __name__ == "__main__". This restriction may be lifted in the future.

Serving Sandbox Environments

Many environments use sandboxes to remotely execute agent actions inside of a virtual machine. When creating such an environment, it is critical to use the asynchronous client. On the backend, your environments are wrapped in a FastAPI server and can process many requests concurrently. Without the use of AsyncOpenReward, requests that execute long-running sandbox actions in your environment can block all other connections, drastically reducing concurrency of your environment and increasing costs.
Do not use the synchronous sandboxes API inside of an environment
from openreward import OpenReward, SandboxSettings
from openreward.environments import Environment, JSONObject, ToolOutput

class MyEnvironment(Environment):

    def __init__(self, task_spec: JSONObject, secrets: dict[str, str] = {}) -> None:
        self.client = OpenReward()
        self.compute_settings = SandboxSettings(...)
        self.computer = self.or_client.sandbox(self.compute_settings)

    @tool
    def bash(...) -> ToolOutput:
        result = self.computer.check_run(...)
Instead, always use the asynchronous sandboxes API
from openreward import AsyncOpenReward, SandboxSettings
from openreward.environments import Environment, JSONObject, ToolOutput

class MyEnvironment(Environment):

    def __init__(self, task_spec: JSONObject, secrets: dict[str, str] = {}) -> None:
        self.client = AsyncOpenReward()
        self.compute_settings = SandboxSettings(...)
        self.computer = self.or_client.sandbox(self.compute_settings)

    @tool
    async def bash(...) -> ToolOutput:
        # Important: we used the asynchronous API, so this call to check_run
        # does not block other requests
        result = await self.computer.check_run(...)

Running Agents in Parallel

When using OpenReward for things like agentic reinforcement learning or evaluation, it is often desirable to run many agents in parallel. To do this, a common design pattern is to define the agent to accept an asynchronous session object, allowing the use of builtin python asyncio library operations to easily run agents in parallel. For example, the interface to the agent could look something like
from openreward import AsyncSession


async def run_agent(session: AsyncSession):
    while True:
        llm_output = await call_llm(...)
        tool_name, tool_input = parse_tool_call(llm_output)
        tool_output = await session.call_tool(tool_name, tool_input)
        # bookkeeping around messages, tool output, etc goes here
We can then run many agents in parallel using asyncio.gather like
import asyncio
from agent import run_agent
from openreward import AsyncOpenReward

async def main():
    client = AsyncOpenReward()
    env = client.environments.get("my-env")

    async def run(task):
        async with env.session(task) as session:
            return await run_agent(session)

    tasks = await env.list_tasks(split="test")
    coros = [run(task) for task in tasks]
    agent_outputs = await asyncio.gather(*[tasks])

if __name__ == "__main__":
    asyncio.run(main())