Goals
- Understand when and where to use the asynchronous client features
- Use the async client to efficiently serve environments that use sandboxes
- Use the async client to run multiple agents in parallel
Prerequisites
Basics
Previously, we’ve seen basic examples using the synchronous client. For example, interacting with an environment:
from openreward import OpenReward
client = OpenReward()
env = client.environments.get("GeneralReasoning/counter")
tasks = env.list_tasks(split="train")
with env.session(tasks[0]) as session:
# your agent logic here
result = session.call_tool("increment", {"amount": 1})
This simple synchronous interface is problematic when we need to interact with openreward features concurrently,
such as when running an evaluation over a suite of environments and tasks. To remedy this, an asynchronous version of the
OpenReward client is provided, called AsyncOpenReward. Using AsyncOpenReward, the above example would become
import asyncio
from openreward import AsyncOpenReward
client = AsyncOpenReward()
async def main():
env = client.environments.get("GeneralReasoning/counter")
tasks = await env.list_tasks(split="train")
async with env.session(tasks[0]) as session:
# your agent logic here
result = await session.call_tool("increment", {"amount": 1})
if __name__ == "__main__":
asyncio.run(main())
Currently, code using AsyncOpenReward is required to be run inside of
if __name__ == "__main__". This restriction may be lifted in the future.
Serving Sandbox Environments
Many environments use sandboxes to remotely execute agent actions inside of a virtual machine.
When creating such an environment, it is critical to use the asynchronous client. On the backend, your environments
are wrapped in a FastAPI server and can process many requests concurrently. Without the
use of AsyncOpenReward, requests that execute long-running sandbox actions in your environment can block all other connections,
drastically reducing concurrency of your environment and increasing costs.
Do not use the synchronous sandboxes API inside of an environment
from openreward import OpenReward, SandboxSettings
from openreward.environments import Environment, JSONObject, ToolOutput
class MyEnvironment(Environment):
def __init__(self, task_spec: JSONObject, secrets: dict[str, str] = {}) -> None:
self.client = OpenReward()
self.compute_settings = SandboxSettings(...)
self.computer = self.or_client.sandbox(self.compute_settings)
@tool
def bash(...) -> ToolOutput:
result = self.computer.check_run(...)
Instead, always use the asynchronous sandboxes API
from openreward import AsyncOpenReward, SandboxSettings
from openreward.environments import Environment, JSONObject, ToolOutput
class MyEnvironment(Environment):
def __init__(self, task_spec: JSONObject, secrets: dict[str, str] = {}) -> None:
self.client = AsyncOpenReward()
self.compute_settings = SandboxSettings(...)
self.computer = self.or_client.sandbox(self.compute_settings)
@tool
async def bash(...) -> ToolOutput:
# Important: we used the asynchronous API, so this call to check_run
# does not block other requests
result = await self.computer.check_run(...)
Running Agents in Parallel
When using OpenReward for things like agentic reinforcement learning or evaluation, it is often desirable
to run many agents in parallel. To do this, a common design pattern is to define the agent to accept an
asynchronous session object, allowing the use of builtin python asyncio library operations to easily
run agents in parallel.
For example, the interface to the agent could look something like
from openreward import AsyncSession
async def run_agent(session: AsyncSession):
while True:
llm_output = await call_llm(...)
tool_name, tool_input = parse_tool_call(llm_output)
tool_output = await session.call_tool(tool_name, tool_input)
# bookkeeping around messages, tool output, etc goes here
We can then run many agents in parallel using asyncio.gather like
import asyncio
from agent import run_agent
from openreward import AsyncOpenReward
async def main():
client = AsyncOpenReward()
env = client.environments.get("my-env")
async def run(task):
async with env.session(task) as session:
return await run_agent(session)
tasks = await env.list_tasks(split="test")
coros = [run(task) for task in tasks]
agent_outputs = await asyncio.gather(*[tasks])
if __name__ == "__main__":
asyncio.run(main())