Skip to main content
1

Create a Python file

run_workflows.py
import asyncio

from agno.client import AgentOSClient


async def run_workflow_non_streaming():
    """Execute a non-streaming workflow run."""
    print("=" * 60)
    print("Non-Streaming Workflow Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available workflows
    config = await client.aget_config()
    if not config.workflows:
        print("No workflows available")
        return

    workflow_id = config.workflows[0].id
    print(f"Running workflow: {workflow_id}")

    try:
        result = await client.run_workflow(
            workflow_id=workflow_id,
            message="What are the benefits of using Python for data science?",
        )

        print(f"\nRun ID: {result.run_id}")
        print(f"Content: {result.content}")
    except Exception as e:
        print(f"Error: {e}")


async def run_workflow_streaming():
    """Execute a streaming workflow run."""
    print("\n" + "=" * 60)
    print("Streaming Workflow Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available workflows
    config = await client.aget_config()
    if not config.workflows:
        print("No workflows available")
        return

    workflow_id = config.workflows[0].id
    print(f"Streaming from workflow: {workflow_id}")
    print("\nResponse: ", end="", flush=True)

    try:
        async for event in client.run_workflow_stream(
            workflow_id=workflow_id,
            message="Explain machine learning in simple terms.",
        ):
            if event.event == "RunContent" and hasattr(event, "content"):
                print(event.content, end="", flush=True)
            elif event.event == "WorkflowAgentCompleted" and hasattr(event, "content") and event.content:
                print(event.content, end="", flush=True)

        print("\n")
    except Exception as e:
        print(f"\nError: {type(e).__name__}: {e}")


async def main():
    await run_workflow_non_streaming()
    await run_workflow_streaming()


if __name__ == "__main__":
    asyncio.run(main())
2

Set up your virtual environment

uv venv --python 3.12
source .venv/bin/activate
3

Install dependencies

uv pip install -U agno openai
4

Export your OpenAI API key

export OPENAI_API_KEY="your_openai_api_key_here"
5

Start an AgentOS Server

Make sure you have an AgentOS server running with workflows configured. See Creating Your First OS for setup instructions.
6

Run the Client

python run_workflows.py