Skip to main content
Learn how to use async/await with the Gravix Layer SDK for better performance in concurrent applications.

Why Use Async?

Async programming allows you to:
  • Handle multiple requests concurrently - Process many API calls simultaneously
  • Improve application responsiveness - Don’t block while waiting for API responses
  • Build scalable applications - Better resource utilization for high-throughput apps

AsyncGravixLayer Client

Use the async client for non-blocking operations:
import asyncio
from gravixlayer import AsyncGravixLayer

async def main():
    async with AsyncGravixLayer() as client:
        response = await client.chat.completions.create(
            model="meta-llama/llama-3.1-8b-instruct",
            messages=[{"role": "user", "content": "Hello!"}]
        )
        print(response.choices[0].message.content)

asyncio.run(main())

Concurrent Requests

Process multiple requests simultaneously for better performance:
import asyncio
from gravixlayer import AsyncGravixLayer

async def process_prompts():
    async with AsyncGravixLayer() as client:
        prompts = [
            "What is AI?",
            "Explain machine learning",
            "How does deep learning work?"
        ]
        
        # Create tasks for concurrent execution
        tasks = [
            client.chat.completions.create(
                model="meta-llama/llama-3.1-8b-instruct",
                messages=[{"role": "user", "content": prompt}]
            )
            for prompt in prompts
        ]
        
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks)
        
        for i, result in enumerate(results):
            print(f"Response {i+1}: {result.choices[0].message.content}")

asyncio.run(process_prompts())

Async Streaming

Stream responses asynchronously:
import asyncio
from gravixlayer import AsyncGravixLayer

async def stream_response():
    async with AsyncGravixLayer() as client:
        stream = await client.chat.completions.create(
            model="meta-llama/llama-3.1-8b-instruct",
            messages=[{"role": "user", "content": "Write a short story"}],
            stream=True
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end='')

asyncio.run(stream_response())

Error Handling

Handle errors gracefully in async code:
import asyncio
from gravixlayer import AsyncGravixLayer
from gravixlayer.exceptions import GravixLayerError

async def safe_request():
    async with AsyncGravixLayer() as client:
        try:
            response = await client.chat.completions.create(
                model="meta-llama/llama-3.1-8b-instruct",
                messages=[{"role": "user", "content": "Hello!"}]
            )
            return response.choices[0].message.content
        except GravixLayerError as e:
            print(f"API Error: {e}")
            return None

result = asyncio.run(safe_request())

Best Practices

  1. Use context managers - Always use async with for automatic cleanup
  2. Limit concurrency - Don’t overwhelm the API with too many concurrent requests
  3. Handle rate limits - Implement backoff strategies for rate-limited requests
  4. Set timeouts - Prevent hanging requests with appropriate timeouts
import asyncio
from gravixlayer import AsyncGravixLayer

async def optimized_batch():
    # Limit concurrent requests
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests
    
    async def limited_request(client, prompt):
        async with semaphore:
            return await client.chat.completions.create(
                model="meta-llama/llama-3.1-8b-instruct",
                messages=[{"role": "user", "content": prompt}]
            )
    
    async with AsyncGravixLayer(timeout=30) as client:
        prompts = [f"Process item {i}" for i in range(20)]
        
        tasks = [limited_request(client, prompt) for prompt in prompts]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Request {i} failed: {result}")
            else:
                print(f"Request {i} succeeded")

asyncio.run(optimized_batch())
I