Skip to content
Go back

Advanced API Integration and Automation with Qwen AI Image Editor

Edit page

Advanced API Integration and Automation with Qwen AI Image Editor

Build scalable, production-ready image editing workflows with comprehensive API integration patterns and automation strategies.

Welcome to the definitive guide for developers looking to integrate Qwen AI Image Editor into their applications and workflows. Building on our previous articles covering quick start and advanced techniques, this guide focuses specifically on programmatic access, automation, and enterprise-level deployment strategies.

Whether you’re building a SaaS application, automating content creation pipelines, or integrating AI image editing into existing systems, this comprehensive resource provides the technical depth and practical examples needed for successful implementation.

Why API Integration Matters

The true power of Qwen AI Image Editor emerges when you move beyond manual editing to automated, scalable workflows. API integration enables:

This guide assumes you have development experience and are familiar with REST APIs, basic authentication concepts, and at least one programming language (Python, JavaScript/Node.js, or similar).


Understanding the Qwen AI Image Editor API Architecture

Before diving into implementation, it’s crucial to understand the API’s architecture and design principles.

Core Components

The Qwen AI Image Editor API consists of several interconnected components:

API Gateway: Handles incoming requests, authentication, and rate limiting Processing Engine: Manages image editing tasks and queue processing Storage Layer: Handles temporary and persistent file storage Monitoring System: Tracks usage, performance, and errors

Request-Response Flow

Client Application → API Gateway → Authentication → Queue → Processing Engine → Storage → Response

This architecture ensures:

Key Design Principles


Authentication and API Access

Proper authentication is fundamental for secure API integration. Qwen AI Image Editor offers multiple authentication methods to suit different use cases.

API Key Authentication

The most straightforward method for most applications:

import requests

# API key from your Qwen AI dashboard
API_KEY = "your_api_key_here"
BASE_URL = "https://api.qwen.ai/v1"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

def test_authentication():
    """Test API key validity"""
    response = requests.get(f"{BASE_URL}/auth/validate", headers=headers)
    return response.status_code == 200

if test_authentication():
    print("Authentication successful!")
else:
    print("Authentication failed - check your API key")

OAuth 2.0 Integration

For applications requiring user-specific access:

const express = require('express');
const axios = require('axios');

const app = express();
const CLIENT_ID = 'your_client_id';
const CLIENT_SECRET = 'your_client_secret';
const REDIRECT_URI = 'https://your-app.com/callback';

// OAuth 2.0 authorization flow
app.get('/auth/qwen', (req, res) => {
    const authUrl = `https://qwen.ai/oauth/authorize?` +
        `client_id=${CLIENT_ID}&` +
        `redirect_uri=${REDIRECT_URI}&` +
        `response_type=code&` +
        `scope=image_edit`;

    res.redirect(authUrl);
});

// Handle OAuth callback
app.get('/callback', async (req, res) => {
    const { code } = req.query;

    try {
        const response = await axios.post('https://qwen.ai/oauth/token', {
            grant_type: 'authorization_code',
            client_id: CLIENT_ID,
            client_secret: CLIENT_SECRET,
            code: code,
            redirect_uri: REDIRECT_URI
        });

        const { access_token } = response.data;
        // Store access_token securely for future API calls
        res.json({ success: true, access_token });
    } catch (error) {
        res.status(500).json({ error: 'Authentication failed' });
    }
});

Managing API Keys Securely

Never hardcode API keys in your application. Use environment variables or secure configuration management:

import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

class QwenAPIConfig:
    def __init__(self):
        self.api_key = os.getenv('QWEN_API_KEY')
        self.base_url = os.getenv('QWEN_API_URL', 'https://api.qwen.ai/v1')
        self.timeout = int(os.getenv('QWEN_API_TIMEOUT', '300'))

        if not self.api_key:
            raise ValueError("QWEN_API_KEY environment variable is required")

# Usage in your application
config = QwenAPIConfig()
api_client = QwenAPIClient(config)

Rate Limiting and Quota Management

Understanding and working within rate limits is crucial for production applications:

import time
from functools import wraps
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_requests_per_minute=60):
        self.max_requests = max_requests_per_minute
        self.requests = []

    def wait_if_needed(self):
        """Wait if rate limit would be exceeded"""
        now = datetime.now()
        one_minute_ago = now - timedelta(minutes=1)

        # Remove requests older than 1 minute
        self.requests = [req_time for req_time in self.requests if req_time > one_minute_ago]

        if len(self.requests) >= self.max_requests:
            # Calculate wait time
            oldest_request = min(self.requests)
            wait_time = (oldest_request + timedelta(minutes=1)) - now

            if wait_time.total_seconds() > 0:
                print(f"Rate limit reached. Waiting {wait_time.total_seconds():.2f} seconds...")
                time.sleep(wait_time.total_seconds())

    def record_request(self):
        """Record a new request"""
        self.requests.append(datetime.now())

def rate_limit(max_requests_per_minute=60):
    """Decorator for rate limiting API calls"""
    def decorator(func):
        limiter = RateLimiter(max_requests_per_minute)

        @wraps(func)
        def wrapper(*args, **kwargs):
            limiter.wait_if_needed()
            result = func(*args, **kwargs)
            limiter.record_request()
            return result

        return wrapper
    return decorator

# Usage
@rate_limit(max_requests_per_minute=60)
def edit_image(image_data, prompt):
    # Your API call here
    pass

Core API Endpoints and Implementation

The Qwen AI Image Editor API provides comprehensive endpoints for all image editing operations. Let’s explore the most important ones with practical implementations.

Image Upload and Preparation

Before editing, images must be uploaded and prepared:

import base64
import requests
from PIL import Image
import io

class QwenImageUploader:
    def __init__(self, api_key, base_url="https://api.qwen.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def upload_image_from_file(self, file_path):
        """Upload image from local file"""
        with open(file_path, 'rb') as image_file:
            image_data = image_file.read()

        return self.upload_image_data(image_data, file_path)

    def upload_image_from_url(self, image_url):
        """Upload image from URL"""
        response = requests.get(image_url)
        response.raise_for_status()

        return self.upload_image_data(response.content, image_url)

    def upload_image_data(self, image_data, source_name="unknown"):
        """Upload raw image data"""
        # Validate image
        try:
            image = Image.open(io.BytesIO(image_data))
            # Convert to RGB if necessary
            if image.mode != 'RGB':
                image = image.convert('RGB')

            # Resize if too large (API limit is typically 10MB)
            max_size = (2048, 2048)
            image.thumbnail(max_size, Image.Resampling.LANCZOS)

            # Save to bytes
            img_buffer = io.BytesIO()
            image.save(img_buffer, format='JPEG', quality=90)
            img_buffer.seek(0)

            # Encode to base64
            base64_image = base64.b64encode(img_buffer.getvalue()).decode('utf-8')

        except Exception as e:
            raise ValueError(f"Invalid image data: {e}")

        # Upload to API
        upload_data = {
            "image": base64_image,
            "filename": f"upload_{int(time.time())}.jpg",
            "source": source_name
        }

        response = requests.post(
            f"{self.base_url}/images/upload",
            headers=self.headers,
            json=upload_data
        )

        if response.status_code == 200:
            return response.json()  # Returns image_id and temporary_url
        else:
            raise Exception(f"Upload failed: {response.text}")

# Usage example
uploader = QwenImageUploader("your_api_key")
result = uploader.upload_image_from_file("path/to/your/image.jpg")
image_id = result['image_id']
print(f"Image uploaded successfully with ID: {image_id}")

Image Editing Operations

Core editing functionality with comprehensive parameter support:

class QwenImageEditor:
    def __init__(self, api_key, base_url="https://api.qwen.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def edit_image(self, image_id, prompt, **kwargs):
        """Perform image editing with comprehensive parameters"""

        edit_params = {
            "image_id": image_id,
            "prompt": prompt,
            # Core parameters
            "num_inference_steps": kwargs.get('steps', 50),
            "guidance_scale": kwargs.get('guidance_scale', 7.5),
            "negative_prompt": kwargs.get('negative_prompt', ""),
            "seed": kwargs.get('seed', -1),  # -1 for random

            # Output parameters
            "width": kwargs.get('width', 512),
            "height": kwargs.get('height', 512),
            "output_format": kwargs.get('format', 'jpeg'),
            "quality": kwargs.get('quality', 90),

            # Advanced parameters
            "true_cfg_scale": kwargs.get('true_cfg_scale', 4.0),
            "controlnet_conditioning_scale": kwargs.get('controlnet_scale', 1.0),

            # Processing options
            "async": kwargs.get('async', False),  # Async processing for large images
            "webhook_url": kwargs.get('webhook_url'),  # Callback URL for async jobs
        }

        response = requests.post(
            f"{self.base_url}/images/edit",
            headers=self.headers,
            json=edit_params
        )

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Edit failed: {response.text}")

    def edit_image_sync(self, image_id, prompt, **kwargs):
        """Synchronous image editing with timeout handling"""
        result = self.edit_image(image_id, prompt, async=False, **kwargs)

        if result.get('status') == 'completed':
            return result
        elif result.get('status') == 'processing':
            # Poll for completion (with timeout)
            job_id = result['job_id']
            return self._poll_job_completion(job_id, timeout=300)
        else:
            raise Exception(f"Edit failed with status: {result.get('status')}")

    def _poll_job_completion(self, job_id, timeout=300):
        """Poll async job until completion or timeout"""
        start_time = time.time()

        while time.time() - start_time < timeout:
            response = requests.get(
                f"{self.base_url}/jobs/{job_id}",
                headers=self.headers
            )

            if response.status_code == 200:
                job_status = response.json()

                if job_status['status'] == 'completed':
                    return job_status
                elif job_status['status'] == 'failed':
                    raise Exception(f"Job failed: {job_status.get('error', 'Unknown error')}")

                # Wait before next poll
                time.sleep(2)
            else:
                raise Exception(f"Failed to check job status: {response.text}")

        raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")

# Usage examples
editor = QwenImageEditor("your_api_key")

# Basic edit
result = editor.edit_image_sync(
    image_id="your_image_id",
    prompt="Convert to cyberpunk style with neon lighting",
    steps=50,
    guidance_scale=7.5
)

# Advanced edit with custom parameters
result = editor.edit_image_sync(
    image_id="your_image_id",
    prompt="Replace background with mountain landscape, keep subject unchanged",
    negative_prompt="blurry, low quality, distorted",
    width=1024,
    height=1024,
    quality=95,
    true_cfg_scale=5.0
)

print(f"Edit completed! Result URL: {result.get('output_url')}")

Batch Processing Operations

Process multiple images efficiently:

import concurrent.futures
from typing import List, Dict, Any

class QwenBatchProcessor:
    def __init__(self, api_key, max_workers=5):
        self.api_key = api_key
        self.editor = QwenImageEditor(api_key)
        self.max_workers = max_workers

    def process_batch(self, image_jobs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process multiple images in parallel"""

        def process_single_job(job):
            try:
                image_id = job['image_id']
                prompt = job['prompt']
                params = job.get('params', {})

                result = self.editor.edit_image_sync(image_id, prompt, **params)

                return {
                    'success': True,
                    'image_id': image_id,
                    'result': result,
                    'job_data': job
                }

            except Exception as e:
                return {
                    'success': False,
                    'image_id': job.get('image_id', 'unknown'),
                    'error': str(e),
                    'job_data': job
                }

        # Process jobs in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            results = list(executor.map(process_single_job, image_jobs))

        return results

    def process_batch_with_progress(self, image_jobs: List[Dict[str, Any]], callback=None):
        """Process batch with progress tracking"""
        total_jobs = len(image_jobs)
        completed_jobs = 0
        results = []

        def process_with_callback(job):
            nonlocal completed_jobs

            result = process_single_job(job)
            completed_jobs += 1

            if callback:
                callback(completed_jobs, total_jobs, result)

            return result

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            results = list(executor.map(process_with_callback, image_jobs))

        return results

# Usage example
def progress_callback(completed, total, result):
    progress = (completed / total) * 100
    status = "" if result['success'] else ""
    print(f"{status} Progress: {progress:.1f}% ({completed}/{total}) - Image: {result['image_id']}")

batch_processor = QwenBatchProcessor("your_api_key", max_workers=3)

# Define batch jobs
batch_jobs = [
    {
        'image_id': 'img_001',
        'prompt': 'Convert to black and white',
        'params': {'steps': 30}
    },
    {
        'image_id': 'img_002',
        'prompt': 'Add vintage film effect',
        'params': {'steps': 40, 'quality': 85}
    },
    {
        'image_id': 'img_003',
        'prompt': 'Enhance colors and saturation',
        'params': {'steps': 50, 'guidance_scale': 8.0}
    }
]

# Process with progress tracking
results = batch_processor.process_batch_with_progress(batch_jobs, progress_callback)

# Summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful

print(f"\nBatch processing completed!")
print(f"Successful: {successful}, Failed: {failed}")

# Save failed jobs for retry
failed_jobs = [r['job_data'] for r in results if not r['success']]
if failed_jobs:
    print(f"Retrying {len(failed_jobs)} failed jobs...")
    retry_results = batch_processor.process_batch(failed_jobs)

Advanced Workflow Automation

Build sophisticated automation workflows that integrate with your existing systems.

Queue-Based Processing with Redis

For high-volume, reliable processing:

import redis
import json
import pickle
from dataclasses import dataclass
from typing import Optional
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ImageEditJob:
    job_id: str
    image_id: str
    prompt: str
    params: dict
    priority: int = 5  # 1-10, lower numbers = higher priority
    retries: int = 0
    max_retries: int = 3
    created_at: float = None
    webhook_url: Optional[str] = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()

class QwenWorkflowQueue:
    def __init__(self, redis_url="redis://localhost:6379", api_key=None):
        self.redis_client = redis.from_url(redis_url)
        self.api_key = api_key
        self.editor = QwenImageEditor(api_key) if api_key else None

        # Queue names
        self.queues = {
            'high_priority': 'qwen:queue:high',
            'normal_priority': 'qwen:queue:normal',
            'low_priority': 'qwen:queue:low'
        }

        # Result storage
        self.results_key = 'qwen:results'

    def submit_job(self, job: ImageEditJob) -> str:
        """Submit job to appropriate priority queue"""
        queue_name = self._get_queue_name(job.priority)

        # Serialize job
        job_data = pickle.dumps(job)

        # Add to queue
        self.redis_client.lpush(queue_name, job_data)

        logger.info(f"Submitted job {job.job_id} to {queue_name}")
        return job.job_id

    def _get_queue_name(self, priority: int) -> str:
        """Get queue name based on priority"""
        if priority <= 3:
            return self.queues['high_priority']
        elif priority <= 7:
            return self.queues['normal_priority']
        else:
            return self.queues['low_priority']

    def get_next_job(self, timeout=30) -> Optional[ImageEditJob]:
        """Get next job from queues (priority order)"""
        for queue_name in [self.queues['high_priority'],
                          self.queues['normal_priority'],
                          self.queues['low_priority']]:

            # Blocking pop with timeout
            result = self.redis_client.brpop(queue_name, timeout=timeout)

            if result:
                _, job_data = result
                job = pickle.loads(job_data)

                logger.info(f"Retrieved job {job.job_id} from {queue_name}")
                return job

        return None

    def process_job(self, job: ImageEditJob) -> dict:
        """Process a single job"""
        try:
            logger.info(f"Processing job {job.job_id}")

            result = self.editor.edit_image_sync(
                image_id=job.image_id,
                prompt=job.prompt,
                **job.params
            )

            # Mark as successful
            job_result = {
                'job_id': job.job_id,
                'status': 'completed',
                'result': result,
                'processed_at': time.time()
            }

            # Store result
            self.redis_client.hset(
                self.results_key,
                job.job_id,
                json.dumps(job_result)
            )

            # Send webhook if specified
            if job.webhook_url:
                self._send_webhook(job.webhook_url, job_result)

            logger.info(f"Successfully completed job {job.job_id}")
            return job_result

        except Exception as e:
            logger.error(f"Failed to process job {job.job_id}: {e}")

            # Handle retries
            if job.retries < job.max_retries:
                job.retries += 1
                logger.info(f"Retrying job {job.job_id} (attempt {job.retries}/{job.max_retries})")

                # Re-queue with lower priority
                job.priority = min(job.priority + 1, 10)
                self.submit_job(job)

                return {
                    'job_id': job.job_id,
                    'status': 'retrying',
                    'retry_count': job.retries,
                    'error': str(e)
                }
            else:
                # Mark as failed
                job_result = {
                    'job_id': job.job_id,
                    'status': 'failed',
                    'error': str(e),
                    'failed_at': time.time(),
                    'retry_count': job.retries
                }

                self.redis_client.hset(
                    self.results_key,
                    job.job_id,
                    json.dumps(job_result)
                )

                # Send webhook for failure
                if job.webhook_url:
                    self._send_webhook(job.webhook_url, job_result)

                return job_result

    def _send_webhook(self, webhook_url: str, data: dict):
        """Send webhook notification"""
        try:
            response = requests.post(webhook_url, json=data, timeout=10)
            response.raise_for_status()
            logger.info(f"Webhook sent successfully to {webhook_url}")
        except Exception as e:
            logger.error(f"Failed to send webhook to {webhook_url}: {e}")

    def get_job_result(self, job_id: str) -> Optional[dict]:
        """Get result for a specific job"""
        result_data = self.redis_client.hget(self.results_key, job_id)

        if result_data:
            return json.loads(result_data)

        return None

    def clear_old_results(self, max_age_hours=24):
        """Clean up old results"""
        cutoff_time = time.time() - (max_age_hours * 3600)

        # Get all results
        all_results = self.redis_client.hgetall(self.results_key)

        for job_id, result_data in all_results.items():
            try:
                result = json.loads(result_data)

                # Check age
                processed_at = result.get('processed_at') or result.get('failed_at', 0)

                if processed_at < cutoff_time:
                    self.redis_client.hdel(self.results_key, job_id)
                    logger.info(f"Cleared old result for job {job_id}")

            except (json.JSONDecodeError, KeyError) as e:
                # Remove malformed entries
                self.redis_client.hdel(self.results_key, job_id)
                logger.warning(f"Removed malformed result for job {job_id}: {e}")

# Worker process
def worker_process(redis_url="redis://localhost:6379", api_key="your_api_key"):
    """Worker process for processing queue jobs"""
    queue = QwenWorkflowQueue(redis_url, api_key)

    logger.info("Worker started, waiting for jobs...")

    while True:
        try:
            job = queue.get_next_job(timeout=60)

            if job:
                queue.process_job(job)
            else:
                logger.debug("No jobs available, continuing...")

        except KeyboardInterrupt:
            logger.info("Worker stopped by user")
            break
        except Exception as e:
            logger.error(f"Worker error: {e}")
            time.sleep(5)  # Prevent rapid error loops

# Usage example
if __name__ == "__main__":
    # Initialize queue
    queue = QwenWorkflowQueue(redis_url="redis://localhost:6379", api_key="your_api_key")

    # Submit some jobs
    jobs = [
        ImageEditJob(
            job_id="job_001",
            image_id="img_001",
            prompt="Convert to vintage style",
            params={"steps": 50},
            priority=2,  # High priority
            webhook_url="https://your-app.com/webhook"
        ),
        ImageEditJob(
            job_id="job_002",
            image_id="img_002",
            prompt="Add dramatic lighting",
            params={"steps": 40},
            priority=5  # Normal priority
        )
    ]

    for job in jobs:
        queue.submit_job(job)

    # Start worker (in production, run as separate process/service)
    worker_process()

Serverless Implementation with AWS Lambda

Deploy scalable, cost-effective processing:

import json
import boto3
import os
import base64
import tempfile
from urllib.parse import unquote_plus

# Lambda handler for AWS
def lambda_handler(event, context):
    """AWS Lambda handler for Qwen image editing"""

    # Initialize clients
    s3_client = boto3.client('s3')
    api_key = os.getenv('QWEN_API_KEY')

    if not api_key:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'QWEN_API_KEY not configured'})
        }

    try:
        # Parse event (expected from S3 trigger or direct invocation)
        if event.get('Records'):
            # S3 trigger event
            return handle_s3_event(event, s3_client, api_key)
        else:
            # Direct invocation
            return handle_direct_invocation(event, s3_client, api_key)

    except Exception as e:
        logger.error(f"Lambda execution failed: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def handle_s3_event(event, s3_client, api_key):
    """Handle S3 trigger event"""

    results = []

    for record in event['Records']:
        s3_info = record['s3']
        bucket_name = s3_info['bucket']['name']
        object_key = unquote_plus(s3_info['object']['key'])

        try:
            # Download image from S3
            with tempfile.NamedTemporaryFile() as temp_file:
                s3_client.download_file(bucket_name, object_key, temp_file.name)

                # Upload to Qwen API
                uploader = QwenImageUploader(api_key)
                upload_result = uploader.upload_image_from_file(temp_file.name)
                qwen_image_id = upload_result['image_id']

                # Process image (you'd get prompt from event metadata)
                prompt = extract_prompt_from_metadata(bucket_name, object_key)

                editor = QwenImageEditor(api_key)
                edit_result = editor.edit_image_sync(
                    image_id=qwen_image_id,
                    prompt=prompt
                )

                # Download result and upload back to S3
                output_key = f"processed/{object_key}"

                if edit_result.get('output_url'):
                    # Download processed image
                    response = requests.get(edit_result['output_url'])
                    response.raise_for_status()

                    # Upload to S3
                    s3_client.put_object(
                        Bucket=bucket_name,
                        Key=output_key,
                        Body=response.content,
                        ContentType='image/jpeg'
                    )

                    results.append({
                        'original': object_key,
                        'processed': output_key,
                        'status': 'success'
                    })

        except Exception as e:
            logger.error(f"Failed to process {object_key}: {e}")
            results.append({
                'original': object_key,
                'error': str(e),
                'status': 'failed'
            })

    return {
        'statusCode': 200,
        'body': json.dumps({
            'processed': len([r for r in results if r['status'] == 'success']),
            'failed': len([r for r in results if r['status'] == 'failed']),
            'results': results
        })
    }

def handle_direct_invocation(event, s3_client, api_key):
    """Handle direct Lambda invocation"""

    # Expected event structure
    image_url = event.get('image_url')
    prompt = event.get('prompt')
    output_bucket = event.get('output_bucket')
    output_key = event.get('output_key')

    if not all([image_url, prompt, output_bucket, output_key]):
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Missing required parameters'})
        }

    try:
        # Download image from URL
        response = requests.get(image_url)
        response.raise_for_status()

        with tempfile.NamedTemporaryFile() as temp_file:
            temp_file.write(response.content)
            temp_file.flush()

            # Upload to Qwen API
            uploader = QwenImageUploader(api_key)
            upload_result = uploader.upload_image_from_file(temp_file.name)
            qwen_image_id = upload_result['image_id']

            # Process image
            editor = QwenImageEditor(api_key)
            edit_result = editor.edit_image_sync(
                image_id=qwen_image_id,
                prompt=prompt
            )

            # Download and upload result
            if edit_result.get('output_url'):
                result_response = requests.get(edit_result['output_url'])
                result_response.raise_for_status()

                s3_client.put_object(
                    Bucket=output_bucket,
                    Key=output_key,
                    Body=result_response.content,
                    ContentType='image/jpeg'
                )

                return {
                    'statusCode': 200,
                    'body': json.dumps({
                        'status': 'success',
                        'output_url': f"s3://{output_bucket}/{output_key}",
                        'processing_time': edit_result.get('processing_time')
                    })
                }
            else:
                return {
                    'statusCode': 500,
                    'body': json.dumps({'error': 'No output URL received from Qwen API'})
                }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

# serverless.yml configuration
serverless_config = """
service: qwen-image-processor

provider:
  name: aws
  runtime: python3.9
  region: us-east-1
  timeout: 300  # 5 minutes
  memorySize: 1024

  environment:
    QWEN_API_KEY: ${ssm:/qwen/api_key}

functions:
  processImage:
    handler: lambda_function.lambda_handler
    events:
      - s3:
          bucket: your-input-bucket
          event: s3:ObjectCreated:*
          existing: true

    layers:
      - ${cf:common-layers-stack.Outputs.PythonRequirementsLayerArn}

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    layer: true
"""

# deployment script
def deploy_lambda():
    """Deploy Lambda function"""
    import subprocess

    # Install Serverless Framework
    subprocess.run(["npm", "install", "-g", "serverless"], check=True)

    # Deploy
    subprocess.run(["serverless", "deploy"], check=True)

    print("Lambda deployment completed!")

Production Deployment and Monitoring

Deploy your Qwen AI integration to production with proper monitoring and scaling strategies.

Docker Containerization

Containerize your application for consistent deployment:

# Dockerfile
FROM python:3.9-slim

# Set working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements
COPY requirements.txt .

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD python health_check.py

# Expose port
EXPOSE 8000

# Start application
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'

services:
  qwen-api-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - QWEN_API_KEY=${QWEN_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://user:password@postgres:5432/qwen_db
    depends_on:
      - redis
      - postgres
    volumes:
      - ./logs:/app/logs
      - ./uploads:/app/uploads
    restart: unless-stopped
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '1.0'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 1G

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=qwen_db
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - qwen-api-service
    restart: unless-stopped

volumes:
  redis_data:
  postgres_data:

Kubernetes Deployment

Scale your application with Kubernetes:

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: qwen-api-service
  labels:
    app: qwen-api-service
spec:
  replicas: 5
  selector:
    matchLabels:
      app: qwen-api-service
  template:
    metadata:
      labels:
        app: qwen-api-service
    spec:
      containers:
      - name: qwen-api
        image: your-registry/qwen-api-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: QWEN_API_KEY
          valueFrom:
            secretKeyRef:
              name: qwen-secrets
              key: api-key
        - name: REDIS_URL
          value: "redis://redis-service:6379"
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secrets
              key: database-url
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        volumeMounts:
        - name: uploads
          mountPath: /app/uploads
      volumes:
      - name: uploads
        persistentVolumeClaim:
          claimName: uploads-pvc

---
apiVersion: v1
kind: Service
metadata:
  name: qwen-api-service
spec:
  selector:
    app: qwen-api-service
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: ClusterIP

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: qwen-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: qwen-api-service
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Monitoring and Logging

Implement comprehensive monitoring:

import time
import logging
import psutil
from prometheus_client import Counter, Histogram, Gauge, start_http_server
from functools import wraps

# Prometheus metrics
REQUEST_COUNT = Counter('qwen_api_requests_total', 'Total API requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('qwen_api_request_duration_seconds', 'API request duration')
ACTIVE_JOBS = Gauge('qwen_active_jobs', 'Number of active processing jobs')
SYSTEM_MEMORY = Gauge('system_memory_usage_bytes', 'System memory usage')
SYSTEM_CPU = Gauge('system_cpu_usage_percent', 'System CPU usage')

class QwenAPIMonitor:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.setup_logging()
        self.setup_metrics()

    def setup_logging(self):
        """Configure structured logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/app/logs/api.log'),
                logging.StreamHandler()
            ]
        )

        # Add JSON formatter for structured logging
        formatter = logging.Formatter(
            '%(asctime)s %(name)s %(levelname)s %(message)s'
        )

        # Add log handler for monitoring systems
        monitoring_handler = logging.StreamHandler()
        monitoring_handler.setFormatter(formatter)
        self.logger.addHandler(monitoring_handler)

    def setup_metrics(self):
        """Start Prometheus metrics server"""
        start_http_server(8001)
        self.logger.info("Prometheus metrics server started on port 8001")

    def track_system_metrics(self):
        """Track system resource usage"""
        # Memory usage
        memory = psutil.virtual_memory()
        SYSTEM_MEMORY.set(memory.used)

        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)
        SYSTEM_CPU.set(cpu_percent)

    def log_request(self, method, endpoint, status, duration, user_id=None):
        """Log API request with metrics"""
        REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
        REQUEST_DURATION.observe(duration)

        log_data = {
            'method': method,
            'endpoint': endpoint,
            'status': status,
            'duration': duration,
            'timestamp': time.time()
        }

        if user_id:
            log_data['user_id'] = user_id

        self.logger.info(f"API Request: {json.dumps(log_data)}")

def monitor_performance(func):
    """Decorator to monitor function performance"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()

        try:
            result = func(*args, **kwargs)

            duration = time.time() - start_time
            logging.info(f"{func.__name__} completed in {duration:.2f}s")

            return result

        except Exception as e:
            duration = time.time() - start_time
            logging.error(f"{func.__name__} failed after {duration:.2f}s: {e}")
            raise

    return wrapper

# Application monitoring
class QwenApplicationMonitor:
    def __init__(self):
        self.monitor = QwenAPIMonitor()
        self.health_status = True

    def health_check(self):
        """Comprehensive health check"""
        checks = {
            'api_connection': self._check_api_connection(),
            'database_connection': self._check_database_connection(),
            'redis_connection': self._check_redis_connection(),
            'disk_space': self._check_disk_space(),
            'memory_usage': self._check_memory_usage()
        }

        all_healthy = all(checks.values())

        return {
            'status': 'healthy' if all_healthy else 'unhealthy',
            'checks': checks,
            'timestamp': time.time()
        }

    def _check_api_connection(self):
        """Check Qwen API connection"""
        try:
            # Test API authentication
            response = requests.get(
                "https://api.qwen.ai/v1/auth/validate",
                headers={"Authorization": f"Bearer {os.getenv('QWEN_API_KEY')}"},
                timeout=10
            )
            return response.status_code == 200
        except:
            return False

    def _check_database_connection(self):
        """Check database connection"""
        try:
            # Implement database health check
            return True  # Placeholder
        except:
            return False

    def _check_redis_connection(self):
        """Check Redis connection"""
        try:
            import redis
            client = redis.from_url(os.getenv('REDIS_URL', 'redis://localhost:6379'))
            client.ping()
            return True
        except:
            return False

    def _check_disk_space(self):
        """Check available disk space"""
        disk = psutil.disk_usage('/')
        free_percent = (disk.free / disk.total) * 100
        return free_percent > 10  # At least 10% free

    def _check_memory_usage(self):
        """Check memory usage"""
        memory = psutil.virtual_memory()
        return memory.percent < 90  # Less than 90% usage

# Usage in Flask application
from flask import Flask, jsonify

app = Flask(__name__)
monitor = QwenApplicationMonitor()

@app.route('/health')
def health_check():
    return jsonify(monitor.health_check())

@app.route('/metrics')
def metrics():
    """Prometheus metrics endpoint"""
    from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
    return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)

# Monitor system metrics periodically
import threading
import time

def update_system_metrics():
    while True:
        monitor.track_system_metrics()
        time.sleep(30)  # Update every 30 seconds

# Start background metrics collection
metrics_thread = threading.Thread(target=update_system_metrics, daemon=True)
metrics_thread.start()

Testing and Quality Assurance

Implement comprehensive testing strategies for reliable API integration.

Unit Testing

Test individual components in isolation:

import unittest
from unittest.mock import Mock, patch, MagicMock
import json
import tempfile
import os

class TestQwenImageUploader(unittest.TestCase):
    def setUp(self):
        self.api_key = "test_api_key"
        self.uploader = QwenImageUploader(self.api_key)

        # Mock sample image
        self.sample_image_path = self._create_sample_image()

    def _create_sample_image(self):
        """Create a sample test image"""
        from PIL import Image

        with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
            # Create a simple test image
            img = Image.new('RGB', (100, 100), color='red')
            img.save(f.name, 'JPEG')
            return f.name

    def tearDown(self):
        # Clean up test files
        if os.path.exists(self.sample_image_path):
            os.unlink(self.sample_image_path)

    @patch('requests.post')
    def test_upload_image_success(self, mock_post):
        """Test successful image upload"""
        # Mock successful API response
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            'image_id': 'test_image_id',
            'temporary_url': 'https://temp.url/image.jpg'
        }
        mock_post.return_value = mock_response

        # Test upload
        result = self.uploader.upload_image_from_file(self.sample_image_path)

        # Assertions
        self.assertEqual(result['image_id'], 'test_image_id')
        self.assertIn('temporary_url', result)

        # Verify API was called correctly
        mock_post.assert_called_once()
        call_args = mock_post.call_args

        # Check headers
        self.assertEqual(call_args[1]['headers']['Authorization'], 'Bearer test_api_key')
        self.assertEqual(call_args[1]['headers']['Content-Type'], 'application/json')

        # Check request body contains image data
        request_data = json.loads(call_args[1]['json'])
        self.assertIn('image', request_data)
        self.assertTrue(request_data['image'].startswith('data:image/jpeg;base64,'))

    @patch('requests.post')
    def test_upload_image_api_failure(self, mock_post):
        """Test API failure handling"""
        mock_post.return_value.status_code = 400
        mock_post.return_value.text = "Invalid image format"

        with self.assertRaises(Exception) as context:
            self.uploader.upload_image_from_file(self.sample_image_path)

        self.assertIn("Upload failed", str(context.exception))

    def test_invalid_image_file(self):
        """Test handling of invalid image files"""
        # Create invalid image file
        with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
            f.write("This is not an image")
            invalid_path = f.name

        try:
            with self.assertRaises(ValueError):
                self.uploader.upload_image_from_file(invalid_path)
        finally:
            os.unlink(invalid_path)

class TestQwenImageEditor(unittest.TestCase):
    def setUp(self):
        self.api_key = "test_api_key"
        self.editor = QwenImageEditor(self.api_key)
        self.test_image_id = "test_image_123"

    @patch('requests.post')
    def test_edit_image_success(self, mock_post):
        """Test successful image editing"""
        # Mock successful response
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            'status': 'completed',
            'output_url': 'https://output.url/edited_image.jpg',
            'processing_time': 15.2
        }
        mock_post.return_value = mock_response

        # Test edit
        result = self.editor.edit_image(
            image_id=self.test_image_id,
            prompt="Convert to black and white"
        )

        # Assertions
        self.assertEqual(result['status'], 'completed')
        self.assertIn('output_url', result)

        # Verify API call
        mock_post.assert_called_once()
        call_args = mock_post.call_args[1]

        # Check request parameters
        request_data = call_args['json']
        self.assertEqual(request_data['image_id'], self.test_image_id)
        self.assertEqual(request_data['prompt'], "Convert to black and white")
        self.assertEqual(request_data['num_inference_steps'], 50)  # Default value
        self.assertEqual(request_data['guidance_scale'], 7.5)  # Default value

    @patch('requests.post')
    @patch('time.sleep')
    def test_edit_image_sync_with_polling(self, mock_sleep, mock_post):
        """Test synchronous editing with job polling"""

        # Mock initial response (processing)
        processing_response = Mock()
        processing_response.status_code = 200
        processing_response.json.return_value = {
            'status': 'processing',
            'job_id': 'job_123'
        }

        # Mock completion response
        completion_response = Mock()
        completion_response.status_code = 200
        completion_response.json.return_value = {
            'status': 'completed',
            'output_url': 'https://output.url/edited_image.jpg'
        }

        # Mock status check response
        status_response = Mock()
        status_response.status_code = 200
        status_response.json.return_value = {
            'status': 'completed',
            'output_url': 'https://output.url/edited_image.jpg'
        }

        # Configure mock to return different responses
        mock_post.side_effect = [
            processing_response,  # Initial edit request
            status_response,      # Status check
        ]

        # Test synchronous edit
        result = self.editor.edit_image_sync(
            image_id=self.test_image_id,
            prompt="Test edit"
        )

        # Assertions
        self.assertEqual(result['status'], 'completed')
        self.assertIn('output_url', result)

class TestQwenBatchProcessor(unittest.TestCase):
    def setUp(self):
        self.api_key = "test_api_key"
        self.processor = QwenBatchProcessor(self.api_key, max_workers=2)

    @patch.object(QwenBatchProcessor, 'process_single_job')
    def test_process_batch_success(self, mock_process):
        """Test successful batch processing"""

        # Mock successful job processing
        mock_process.return_value = {
            'success': True,
            'image_id': 'test_img',
            'result': {'output_url': 'test_url'}
        }

        # Define test jobs
        jobs = [
            {'image_id': 'img_1', 'prompt': 'Edit 1', 'params': {}},
            {'image_id': 'img_2', 'prompt': 'Edit 2', 'params': {}}
        ]

        # Process batch
        results = self.processor.process_batch(jobs)

        # Assertions
        self.assertEqual(len(results), 2)
        self.assertTrue(all(r['success'] for r in results))
        self.assertEqual(mock_process.call_count, 2)

    @patch.object(QwenBatchProcessor, 'process_single_job')
    def test_process_batch_with_failures(self, mock_process):
        """Test batch processing with some failures"""

        # Mock mixed results
        mock_process.side_effect = [
            {'success': True, 'image_id': 'img_1', 'result': {}},
            {'success': False, 'image_id': 'img_2', 'error': 'API Error'},
            {'success': True, 'image_id': 'img_3', 'result': {}}
        ]

        jobs = [
            {'image_id': 'img_1', 'prompt': 'Edit 1', 'params': {}},
            {'image_id': 'img_2', 'prompt': 'Edit 2', 'params': {}},
            {'image_id': 'img_3', 'prompt': 'Edit 3', 'params': {}}
        ]

        results = self.processor.process_batch(jobs)

        # Assertions
        self.assertEqual(len(results), 3)
        self.assertEqual(len([r for r in results if r['success']]), 2)
        self.assertEqual(len([r for r in results if not r['success']]), 1)

if __name__ == '__main__':
    unittest.main()

Integration Testing

Test complete workflows end-to-end:

import pytest
import requests
import time
import tempfile
import os
from PIL import Image

class TestQwenAPIIntegration:
    """Integration tests for Qwen API"""

    @pytest.fixture(scope="class")
    def api_client(self):
        """Create API client for testing"""
        api_key = os.getenv('QWEN_TEST_API_KEY')
        if not api_key:
            pytest.skip("QWEN_TEST_API_KEY not set")

        return QwenImageEditor(api_key)

    @pytest.fixture(scope="class")
    def test_image(self):
        """Create test image"""
        with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as f:
            # Create test image
            img = Image.new('RGB', (512, 512), color='blue')
            img.save(f.name, 'JPEG')
            yield f.name
            os.unlink(f.name)

    def test_complete_workflow(self, api_client, test_image):
        """Test complete image editing workflow"""

        # Step 1: Upload image
        uploader = QwenImageUploader(api_client.api_key)
        upload_result = uploader.upload_image_from_file(test_image)

        assert 'image_id' in upload_result
        image_id = upload_result['image_id']

        # Step 2: Edit image
        edit_result = api_client.edit_image_sync(
            image_id=image_id,
            prompt="Convert to grayscale",
            steps=30  # Faster for testing
        )

        assert edit_result['status'] == 'completed'
        assert 'output_url' in edit_result

        # Step 3: Download and verify result
        response = requests.get(edit_result['output_url'])
        response.raise_for_status()

        # Verify we got a valid image
        result_img = Image.open(io.BytesIO(response.content))
        assert result_img.size == (512, 512)  # Check dimensions

    def test_batch_processing_workflow(self, api_client, test_image):
        """Test batch processing workflow"""

        # Upload multiple images
        uploader = QwenImageUploader(api_client.api_key)
        image_ids = []

        for i in range(3):
            result = uploader.upload_image_from_file(test_image)
            image_ids.append(result['image_id'])

        # Process batch
        batch_processor = QwenBatchProcessor(api_client.api_key, max_workers=2)

        jobs = [
            {
                'image_id': img_id,
                'prompt': f"Apply style {i+1}",
                'params': {'steps': 25}
            }
            for i, img_id in enumerate(image_ids)
        ]

        results = batch_processor.process_batch(jobs)

        # Verify results
        assert len(results) == 3
        assert len([r for r in results if r['success']]) >= 2  # Allow for occasional failures

    @pytest.mark.slow
    def test_performance_limits(self, api_client, test_image):
        """Test performance and rate limits"""

        uploader = QwenImageUploader(api_client.api_key)

        # Test upload performance
        start_time = time.time()

        upload_results = []
        for i in range(5):  # Test 5 concurrent uploads
            result = uploader.upload_image_from_file(test_image)
            upload_results.append(result)

        upload_time = time.time() - start_time

        # Upload should complete within reasonable time
        assert upload_time < 60  # 60 seconds max
        assert len(upload_results) == 5

        # Test editing performance
        editor = QwenImageEditor(api_client.api_key)

        edit_start = time.time()

        for result in upload_results:
            edit_result = editor.edit_image_sync(
                image_id=result['image_id'],
                prompt="Quick test edit",
                steps=20  # Fast for testing
            )
            assert edit_result['status'] == 'completed'

        edit_time = time.time() - edit_start

        # Edits should complete within reasonable time
        assert edit_time < 300  # 5 minutes max

# Configuration for pytest
# pytest.ini or conftest.py
pytest_plugins = ["pytest_asyncio"]

def pytest_configure(config):
    """Configure pytest"""
    config.addinivalue_line(
        "markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')"
    )
    config.addinivalue_line(
        "markers", "integration: marks tests as integration tests"
    )

Load Testing

Test system performance under load:

import asyncio
import aiohttp
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt

class QwenLoadTester:
    def __init__(self, api_key, base_url="https://api.qwen.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.results = []

    async def single_request(self, session, request_data):
        """Single async request"""
        start_time = time.time()

        try:
            async with session.post(
                f"{self.base_url}/images/edit",
                headers={
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                },
                json=request_data
            ) as response:

                end_time = time.time()
                duration = end_time - start_time

                result = {
                    'success': response.status == 200,
                    'status_code': response.status,
                    'duration': duration,
                    'timestamp': start_time
                }

                if response.status == 200:
                    result['response'] = await response.json()
                else:
                    result['error'] = await response.text()

                return result

        except Exception as e:
            end_time = time.time()
            return {
                'success': False,
                'error': str(e),
                'duration': end_time - start_time,
                'timestamp': start_time
            }

    async def load_test_concurrent(self, concurrent_users=10, requests_per_user=5):
        """Load test with concurrent users"""

        # Create test data
        test_requests = [
            {
                'image_id': f'test_img_{i}',
                'prompt': f'Test edit {i}',
                'num_inference_steps': 20  # Faster for testing
            }
            for i in range(concurrent_users * requests_per_user)
        ]

        async with aiohttp.ClientSession() as session:
            tasks = []

            # Create tasks for concurrent users
            for user_id in range(concurrent_users):
                user_tasks = []

                for req_id in range(requests_per_user):
                    request_idx = user_id * requests_per_user + req_id
                    task = self.single_request(session, test_requests[request_idx])
                    user_tasks.append(task)

                tasks.extend(user_tasks)

            # Execute all tasks concurrently
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Filter out exceptions
            valid_results = [r for r in results if isinstance(r, dict)]
            self.results.extend(valid_results)

            return valid_results

    def analyze_results(self):
        """Analyze load test results"""

        if not self.results:
            return {}

        # Basic statistics
        total_requests = len(self.results)
        successful_requests = len([r for r in self.results if r['success']])
        failed_requests = total_requests - successful_requests

        durations = [r['duration'] for r in self.results if r['success']]

        analysis = {
            'total_requests': total_requests,
            'successful_requests': successful_requests,
            'failed_requests': failed_requests,
            'success_rate': (successful_requests / total_requests) * 100,
            'duration_stats': {
                'min': min(durations) if durations else 0,
                'max': max(durations) if durations else 0,
                'mean': statistics.mean(durations) if durations else 0,
                'median': statistics.median(durations) if durations else 0,
                'stdev': statistics.stdev(durations) if len(durations) > 1 else 0
            }
        }

        # Performance percentiles
        if durations:
            durations.sort()
            analysis['percentiles'] = {
                'p50': durations[int(len(durations) * 0.5)],
                'p90': durations[int(len(durations) * 0.9)],
                'p95': durations[int(len(durations) * 0.95)],
                'p99': durations[int(len(durations) * 0.99)]
            }

        # Throughput calculation
        if self.results:
            time_span = max(r['timestamp'] for r in self.results) - min(r['timestamp'] for r in self.results)
            analysis['throughput'] = {
                'requests_per_second': total_requests / time_span if time_span > 0 else 0,
                'successful_rps': successful_requests / time_span if time_span > 0 else 0
            }

        return analysis

    def generate_report(self, output_file="load_test_report.html"):
        """Generate HTML report"""

        analysis = self.analyze_results()

        html_template = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Qwen API Load Test Report</title>
            <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                .metric { background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 5px; }
                .chart { margin: 20px 0; }
                table { border-collapse: collapse; width: 100%; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
            </style>
        </head>
        <body>
            <h1>Qwen API Load Test Report</h1>

            <div class="metric">
                <h2>Summary</h2>
                <table>
                    <tr><th>Metric</th><th>Value</th></tr>
                    <tr><td>Total Requests</td><td>{total_requests}</td></tr>
                    <tr><td>Success Rate</td><td>{success_rate:.2f}%</td></tr>
                    <tr><td>Average Duration</td><td>{avg_duration:.2f}s</td></tr>
                    <tr><td>Throughput</td><td>{throughput:.2f} req/s</td></tr>
                </table>
            </div>

            <div class="chart">
                <h2>Response Time Distribution</h2>
                <canvas id="durationChart" width="400" height="200"></canvas>
            </div>

            <div class="chart">
                <h2>Success Rate Over Time</h2>
                <canvas id="successChart" width="400" height="200"></canvas>
            </div>

            <script>
                // Duration distribution chart
                const durationCtx = document.getElementById('durationChart').getContext('2d');
                new Chart(durationCtx, {{
                    type: 'histogram',
                    data: {{
                        labels: {duration_labels},
                        datasets: [{{
                            label: 'Response Times',
                            data: {duration_data},
                            backgroundColor: 'rgba(54, 162, 235, 0.6)'
                        }}]
                    }}
                }});

                // Success rate chart
                const successCtx = document.getElementById('successChart').getContext('2d');
                new Chart(successCtx, {{
                    type: 'line',
                    data: {{
                        labels: {time_labels},
                        datasets: [{{
                            label: 'Success Rate %',
                            data: {success_data},
                            borderColor: 'rgb(75, 192, 192)',
                            tension: 0.1
                        }}]
                    }}
                }});
            </script>
        </body>
        </html>
        """

        # Prepare data for charts
        durations = [r['duration'] for r in self.results if r['success']]

        with open(output_file, 'w') as f:
            f.write(html_template.format(
                total_requests=analysis['total_requests'],
                success_rate=analysis['success_rate'],
                avg_duration=analysis['duration_stats']['mean'],
                throughput=analysis['throughput']['requests_per_second'],
                duration_labels=str([f"{d:.1f}s" for d in durations]),
                duration_data=str(durations),
                time_labels='[]',  # Would need time-series data
                success_data='[]'   # Would need success rate over time
            ))

        print(f"Load test report generated: {output_file}")

# Usage example
async def run_load_test():
    tester = QwenLoadTester("your_api_key")

    # Run load test
    results = await tester.load_test_concurrent(
        concurrent_users=20,
        requests_per_user=5
    )

    # Analyze results
    analysis = tester.analyze_results()

    print(f"Load test completed:")
    print(f"Total requests: {analysis['total_requests']}")
    print(f"Success rate: {analysis['success_rate']:.2f}%")
    print(f"Average duration: {analysis['duration_stats']['mean']:.2f}s")
    print(f"Throughput: {analysis['throughput']['requests_per_second']:.2f} req/s")

    # Generate report
    tester.generate_report()

if __name__ == "__main__":
    asyncio.run(run_load_test())

Security Best Practices

Implement robust security measures for production deployments.

API Key Management

Secure your API credentials:

import os
import json
import hashlib
import cryptography.fernet
from cryptography.fernet import Fernet
import boto3
from botocore.exceptions import ClientError

class SecureAPIKeyManager:
    def __init__(self, encryption_key=None):
        if encryption_key:
            self.fernet = Fernet(encryption_key.encode())
        else:
            # Generate new encryption key
            self.fernet = Fernet(Fernet.generate_key())

    def encrypt_api_key(self, api_key):
        """Encrypt API key for storage"""
        return self.fernet.encrypt(api_key.encode()).decode()

    def decrypt_api_key(self, encrypted_key):
        """Decrypt API key for use"""
        return self.fernet.decrypt(encrypted_key.encode()).decode()

    def hash_api_key(self, api_key):
        """Create hash for API key identification"""
        return hashlib.sha256(api_key.encode()).hexdigest()

class AWSSecretsManager:
    def __init__(self, region_name='us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region_name)

    def store_api_key(self, secret_name, api_key, description="Qwen API Key"):
        """Store API key in AWS Secrets Manager"""
        try:
            self.client.create_secret(
                Name=secret_name,
                Description=description,
                SecretString=json.dumps({
                    'api_key': api_key,
                    'created_at': str(time.time())
                })
            )
            print(f"Secret {secret_name} created successfully")

        except self.client.exceptions.ResourceExistsException:
            # Update existing secret
            self.client.update_secret(
                SecretId=secret_name,
                Description=description,
                SecretString=json.dumps({
                    'api_key': api_key,
                    'updated_at': str(time.time())
                })
            )
            print(f"Secret {secret_name} updated successfully")

    def get_api_key(self, secret_name):
        """Retrieve API key from AWS Secrets Manager"""
        try:
            response = self.client.get_secret_value(SecretId=secret_name)
            secret_data = json.loads(response['SecretString'])
            return secret_data['api_key']

        except ClientError as e:
            print(f"Error retrieving secret {secret_name}: {e}")
            return None

    def rotate_api_key(self, secret_name, new_api_key):
        """Rotate API key with zero downtime"""
        # Store new key
        self.store_api_key(f"{secret_name}-new", new_api_key, "New Qwen API Key")

        # Update application to use new key
        # (This would be implemented in your deployment pipeline)

        # Mark old key for deletion (grace period)
        print(f"API key rotation initiated for {secret_name}")
        print(f"Delete old key after grace period: {secret_name}-old")

# Secure configuration management
class SecureConfig:
    def __init__(self, config_file='config.json'):
        self.config_file = config_file
        self.key_manager = SecureAPIKeyManager()
        self.config = self._load_config()

    def _load_config(self):
        """Load configuration from encrypted file"""
        if os.path.exists(self.config_file):
            with open(self.config_file, 'r') as f:
                encrypted_config = json.load(f)

            # Decrypt configuration
            config = {}
            for key, value in encrypted_config.items():
                if key.endswith('_encrypted'):
                    config[key.replace('_encrypted', '')] = self.key_manager.decrypt_api_key(value)
                else:
                    config[key] = value

            return config
        else:
            return self._create_default_config()

    def _create_default_config(self):
        """Create default configuration"""
        config = {
            'qwen_api_key': os.getenv('QWEN_API_KEY'),
            'base_url': 'https://api.qwen.ai/v1',
            'timeout': 300,
            'max_retries': 3,
            'rate_limit_per_minute': 60
        }

        self._save_config(config)
        return config

    def _save_config(self, config):
        """Save configuration (encrypt sensitive values)"""
        encrypted_config = {}

        for key, value in config.items():
            if 'api_key' in key.lower() or 'secret' in key.lower():
                encrypted_config[f"{key}_encrypted"] = self.key_manager.encrypt_api_key(value)
            else:
                encrypted_config[key] = value

        with open(self.config_file, 'w') as f:
            json.dump(encrypted_config, f, indent=2)

    def get(self, key, default=None):
        """Get configuration value"""
        return self.config.get(key, default)

    def set(self, key, value):
        """Set configuration value"""
        self.config[key] = value
        self._save_config(self.config)

# Usage
config = SecureConfig()
api_key = config.get('qwen_api_key')

# Or use AWS Secrets Manager
secrets_manager = AWSSecretsManager()
api_key = secrets_manager.get_api_key('qwen/prod/api-key')

Input Validation and Sanitization

Validate all inputs to prevent security issues:

import re
import bleach
from PIL import Image
import io
import base64

class InputValidator:
    """Comprehensive input validation for Qwen API"""

    # Allowed file extensions
    ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.webp', '.bmp', '.tiff'}

    # Maximum file size (10MB)
    MAX_FILE_SIZE = 10 * 1024 * 1024

    # Maximum image dimensions
    MAX_DIMENSIONS = (4096, 4096)

    # Minimum dimensions
    MIN_DIMENSIONS = (64, 64)

    # Prompt validation
    MAX_PROMPT_LENGTH = 1000
    BANNED_WORDS = ['violent', 'illegal', 'harmful', 'adult']  # Example

    @classmethod
    def validate_image_file(cls, file_data, filename):
        """Validate uploaded image file"""
        validation_result = {
            'valid': True,
            'errors': [],
            'warnings': []
        }

        # Check file size
        if len(file_data) > cls.MAX_FILE_SIZE:
            validation_result['valid'] = False
            validation_result['errors'].append(f"File size exceeds {cls.MAX_FILE_SIZE} bytes")

        # Check file extension
        file_ext = os.path.splitext(filename)[1].lower()
        if file_ext not in cls.ALLOWED_EXTENSIONS:
            validation_result['valid'] = False
            validation_result['errors'].append(f"File extension {file_ext} not allowed")

        # Validate image content
        try:
            image = Image.open(io.BytesIO(file_data))

            # Check dimensions
            if image.size[0] < cls.MIN_DIMENSIONS[0] or image.size[1] < cls.MIN_DIMENSIONS[1]:
                validation_result['valid'] = False
                validation_result['errors'].append(f"Image too small: {image.size}")

            if image.size[0] > cls.MAX_DIMENSIONS[0] or image.size[1] > cls.MAX_DIMENSIONS[1]:
                validation_result['warnings'].append(f"Image will be resized from {image.size}")

            # Check image mode
            if image.mode not in ['RGB', 'RGBA']:
                validation_result['warnings'].append(f"Image mode {image.mode} will be converted to RGB")

        except Exception as e:
            validation_result['valid'] = False
            validation_result['errors'].append(f"Invalid image file: {e}")

        return validation_result

    @classmethod
    def validate_prompt(cls, prompt):
        """Validate edit prompt"""
        validation_result = {
            'valid': True,
            'errors': [],
            'warnings': [],
            'sanitized_prompt': prompt
        }

        # Check length
        if len(prompt) > cls.MAX_PROMPT_LENGTH:
            validation_result['valid'] = False
            validation_result['errors'].append(f"Prompt too long: {len(prompt)} > {cls.MAX_PROMPT_LENGTH}")

        # Check for banned words
        lower_prompt = prompt.lower()
        banned_found = [word for word in cls.BANNED_WORDS if word in lower_prompt]

        if banned_found:
            validation_result['valid'] = False
            validation_result['errors'].append(f"Prompt contains prohibited content: {banned_found}")

        # Sanitize prompt (remove potentially harmful content)
        sanitized = bleach.clean(prompt, tags=[], strip=True)
        sanitized = re.sub(r'[<>]', '', sanitized)  # Remove brackets

        if sanitized != prompt:
            validation_result['warnings'].append("Prompt was sanitized for security")
            validation_result['sanitized_prompt'] = sanitized

        return validation_result

    @classmethod
    def validate_base64_image(cls, base64_data):
        """Validate base64 encoded image"""
        validation_result = {
            'valid': True,
            'errors': [],
            'warnings': []
        }

        try:
            # Remove data URL prefix if present
            if base64_data.startswith('data:image/'):
                base64_data = base64_data.split(',')[1]

            # Decode base64
            image_data = base64.b64decode(base64_data)

            # Validate as image file
            file_validation = cls.validate_image_file(image_data, 'base64_image')

            if not file_validation['valid']:
                validation_result['valid'] = False
                validation_result['errors'].extend(file_validation['errors'])

            validation_result['warnings'].extend(file_validation['warnings'])

        except Exception as e:
            validation_result['valid'] = False
            validation_result['errors'].append(f"Invalid base64 image data: {e}")

        return validation_result

    @classmethod
    def validate_job_parameters(cls, params):
        """Validate job parameters"""
        validation_result = {
            'valid': True,
            'errors': [],
            'sanitized_params': {}
        }

        # Define parameter ranges
        param_ranges = {
            'num_inference_steps': {'min': 1, 'max': 100},
            'guidance_scale': {'min': 1.0, 'max': 20.0},
            'width': {'min': 64, 'max': 4096},
            'height': {'min': 64, 'max': 4096},
            'quality': {'min': 1, 'max': 100}
        }

        for param, value in params.items():
            if param in param_ranges:
                range_info = param_ranges[param]

                if isinstance(value, (int, float)):
                    if value < range_info['min'] or value > range_info['max']:
                        validation_result['valid'] = False
                        validation_result['errors'].append(
                            f"{param} {value} out of range [{range_info['min']}, {range_info['max']}]"
                        )

                        # Clamp to valid range
                        sanitized_value = max(range_info['min'], min(range_info['max'], value))
                        validation_result['sanitized_params'][param] = sanitized_value
                    else:
                        validation_result['sanitized_params'][param] = value
                else:
                    validation_result['valid'] = False
                    validation_result['errors'].append(f"{param} must be numeric")
            else:
                # Pass through unknown parameters (but could add validation here)
                validation_result['sanitized_params'][param] = value

        return validation_result

# Usage in API endpoints
@app.route('/api/upload', methods=['POST'])
def upload_image():
    """Secure image upload endpoint"""

    try:
        # Get file from request
        if 'image' not in request.files:
            return jsonify({'error': 'No image file provided'}), 400

        file = request.files['image']

        # Validate file
        validation_result = InputValidator.validate_image_file(
            file.read(),
            file.filename
        )

        if not validation_result['valid']:
            return jsonify({
                'error': 'Invalid image file',
                'details': validation_result['errors']
            }), 400

        # Process file if valid
        file.seek(0)  # Reset file pointer
        # ... continue with upload logic

        return jsonify({
            'success': True,
            'warnings': validation_result['warnings']
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/edit', methods=['POST'])
def edit_image():
    """Secure image editing endpoint"""

    try:
        data = request.get_json()

        # Validate prompt
        prompt_validation = InputValidator.validate_prompt(data.get('prompt', ''))

        if not prompt_validation['valid']:
            return jsonify({
                'error': 'Invalid prompt',
                'details': prompt_validation['errors']
            }), 400

        # Validate parameters
        params = data.get('params', {})
        param_validation = InputValidator.validate_job_parameters(params)

        if not param_validation['valid']:
            return jsonify({
                'error': 'Invalid parameters',
                'details': param_validation['errors']
            }), 400

        # Use sanitized values
        sanitized_prompt = prompt_validation['sanitized_prompt']
        sanitized_params = param_validation['sanitized_params']

        # Continue with processing...

        return jsonify({
            'success': True,
            'prompt': sanitized_prompt,
            'params': sanitized_params,
            'warnings': prompt_validation.get('warnings', [])
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 500

Conclusion and Next Steps

This comprehensive guide has covered the essential aspects of integrating Qwen AI Image Editor into production applications. From basic API authentication to advanced automation workflows, security best practices, and monitoring strategies, you now have the foundation to build scalable, reliable image editing solutions.

Key Takeaways

  1. Start Simple: Begin with basic API integration and gradually add complexity
  2. Prioritize Security: Never hardcode credentials; always use secure storage
  3. Implement Monitoring: Track performance, errors, and usage patterns
  4. Design for Scale: Use queues, caching, and load balancing from the start
  5. Test Thoroughly: Unit tests, integration tests, and load testing are essential
  6. Plan for Failures: Implement retry logic, error handling, and circuit breakers
  1. Week 1: Basic API integration with authentication
  2. Week 2: Implement core editing functionality and error handling
  3. Week 3: Add batch processing and workflow automation
  4. Week 4: Implement monitoring, logging, and alerting
  5. Week 5: Security hardening and compliance measures
  6. Week 6: Load testing and performance optimization
  7. Week 7: Production deployment and documentation

Additional Resources

Official Documentation:

Development Tools:

Production Resources:


This article completes our three-part series on Qwen AI Image Editor. You now have everything needed to move from basic usage to advanced professional implementation with API integration and automation.

Last updated: January 2025


Edit page
Share this post on:

Previous Post
Enterprise AI Image Editor Deployment Maximizing ROI and Team Productivity
Next Post
Mastering Qwen AI Image Editor Advanced Techniques and Professional Workflows