Skip to main content
Learn how to use async/await with the Gravix Layer SDK for better performance in concurrent applications.

Why Use Async?

Async programming allows you to:
  • Handle multiple requests concurrently - Process many API calls simultaneously
  • Improve application responsiveness - Don’t block while waiting for API responses
  • Build scalable applications - Better resource utilization for high-throughput apps

AsyncGravixLayer Client

Use the async client for non-blocking operations:
import asyncio
from gravixlayer import AsyncGravixLayer

async def main():
    client = AsyncGravixLayer()
    response = await client.chat.completions.create(
        model="meta-llama/llama-3.1-8b-instruct",
        messages=[{"role": "user", "content": "Hello!"}]
    )
    print(response.choices[0].message.content)

asyncio.run(main())

Concurrent Requests

Process multiple requests simultaneously for better performance:
import asyncio
from gravixlayer import AsyncGravixLayer

async def process_prompts():
    client = AsyncGravixLayer()
    prompts = [
        "What is AI?",
        "Explain machine learning",
        "How does deep learning work?"
    ]
    
    # Create tasks for concurrent execution
    tasks = [
        client.chat.completions.create(
            model="meta-llama/llama-3.1-8b-instruct",
            messages=[{"role": "user", "content": prompt}]
        )
        for prompt in prompts
    ]
    
    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    
    for i, result in enumerate(results):
        print(f"Response {i+1}: {result.choices[0].message.content}")

asyncio.run(process_prompts())

Async Streaming

Stream responses asynchronously:
import asyncio
from gravixlayer import AsyncGravixLayer

async def stream_response():
    client = AsyncGravixLayer()
    # Note: When stream=True, create() returns an async generator directly (no await needed)
    stream = client.chat.completions.create(
        model="meta-llama/llama-3.1-8b-instruct",
        messages=[{"role": "user", "content": "Write a short story"}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end='')

asyncio.run(stream_response())

Error Handling

Handle errors gracefully in async code:
import asyncio
from gravixlayer import AsyncGravixLayer
from gravixlayer.types.exceptions import GravixLayerError

async def safe_request():
    client = AsyncGravixLayer()
    try:
        response = await client.chat.completions.create(
            model="meta-llama/llama-3.1-8b-instruct",
            messages=[{"role": "user", "content": "Hello!"}]
        )
        return response.choices[0].message.content
    except GravixLayerError as e:
        print(f"API Error: {e}")
        return None

result = asyncio.run(safe_request())
if result:
    print(result)

Best Practices

  1. Limit concurrency - Don’t overwhelm the API with too many concurrent requests
  2. Handle rate limits - Implement backoff strategies for rate-limited requests
  3. Set timeouts - Prevent hanging requests with appropriate timeouts
  4. Handle exceptions - Use try-except blocks and gather with return_exceptions=True
import asyncio
from gravixlayer import AsyncGravixLayer

async def optimized_batch():
    # Limit concurrent requests
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests
    
    async def limited_request(client, prompt):
        async with semaphore:
            return await client.chat.completions.create(
                model="meta-llama/llama-3.1-8b-instruct",
                messages=[{"role": "user", "content": prompt}]
            )
    
    client = AsyncGravixLayer(timeout=30)
    prompts = [f"Process item {i}" for i in range(20)]
    
    tasks = [limited_request(client, prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Request {i} failed: {result}")
        else:
            print(f"Request {i} succeeded")

asyncio.run(optimized_batch())