Process large volumes of data efficiently with parallel and batch operations.
Batch processing is ideal for:
Batch processing can improve throughput by 3-10x compared to sequential processing, while staying within rate limits.
Process multiple requests concurrently with controlled concurrency:
JavaScript (with p-limit)import pLimit from 'p-limit';
const limit = pLimit(10); // Max 10 concurrent requests
async function processBatch(items) {
const results = await Promise.all(
items.map(item =>
limit(() => processItem(item))
)
);
return results;
}
async function processItem(item) {
const response = await fetch('https://api.vigthoria.io/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'vigthoria-reasoning-v2',
messages: [{ role: 'user', content: item.prompt }],
max_tokens: 500
})
});
const data = await response.json();
return { id: item.id, result: data.choices[0].message.content };
}
// Usage
const items = [
{ id: 1, prompt: 'Summarize: ...' },
{ id: 2, prompt: 'Analyze: ...' },
// ... 1000 more items
];
const results = await processBatch(items);
import asyncio
import aiohttp
from asyncio import Semaphore
async def process_batch(items, max_concurrent=10):
semaphore = Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [process_item(session, semaphore, item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def process_item(session, semaphore, item):
async with semaphore:
async with session.post(
'https://api.vigthoria.io/v1/chat/completions',
headers={
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
},
json={
'model': 'vigthoria-reasoning-v2',
'messages': [{'role': 'user', 'content': item['prompt']}],
'max_tokens': 500
}
) as response:
data = await response.json()
return {
'id': item['id'],
'result': data['choices'][0]['message']['content']
}
# Usage
items = [{'id': i, 'prompt': f'Task {i}'} for i in range(1000)]
results = asyncio.run(process_batch(items))
For very large jobs, use a job queue for reliability:
Node.js with Bull Queueconst Queue = require('bull');
const Redis = require('ioredis');
// Create queue
const batchQueue = new Queue('ai-batch', {
redis: { host: 'localhost', port: 6379 }
});
// Add jobs
async function enqueueBatch(items) {
const jobs = items.map(item => ({
name: 'process-item',
data: item,
opts: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 }
}
}));
await batchQueue.addBulk(jobs);
console.log(`Enqueued ${jobs.length} items`);
}
// Process jobs (workers)
batchQueue.process('process-item', 10, async (job) => {
const response = await vigthoria.chat.completions.create({
model: 'vigthoria-reasoning-v2',
messages: [{ role: 'user', content: job.data.prompt }]
});
return {
id: job.data.id,
result: response.choices[0].message.content
};
});
// Monitor progress
batchQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`);
});
batchQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
async function processWithRetry(item, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const result = await processItem(item);
return { success: true, ...result };
} catch (error) {
lastError = error;
// Check if retryable
if (error.status === 429) {
// Rate limited - wait and retry
const waitTime = Math.pow(2, attempt) * 1000;
console.log(`Rate limited, waiting ${waitTime}ms`);
await sleep(waitTime);
} else if (error.status >= 500) {
// Server error - retry with backoff
await sleep(Math.pow(2, attempt) * 1000);
} else {
// Client error - don't retry
break;
}
}
}
return {
success: false,
id: item.id,
error: lastError.message
};
}
async function processBatchWithRetry(items) {
const results = await Promise.all(
items.map(item => processWithRetry(item))
);
const succeeded = results.filter(r => r.success);
const failed = results.filter(r => !r.success);
console.log(`Completed: ${succeeded.length}, Failed: ${failed.length}`);
return { succeeded, failed };
}
Be mindful of rate limits. With 10 concurrent requests, you'll hit most rate limits quickly. Implement backoff and don't exceed your plan's limits.
const EventEmitter = require('events');
class BatchProcessor extends EventEmitter {
constructor(items, options = {}) {
super();
this.items = items;
this.concurrency = options.concurrency || 10;
this.completed = 0;
this.failed = 0;
this.results = [];
}
async process() {
this.emit('start', { total: this.items.length });
const chunks = this.chunk(this.items, this.concurrency);
for (const chunk of chunks) {
const results = await Promise.all(
chunk.map(async item => {
try {
const result = await this.processItem(item);
this.completed++;
this.emit('progress', {
completed: this.completed,
total: this.items.length,
percent: Math.round((this.completed / this.items.length) * 100)
});
return result;
} catch (err) {
this.failed++;
this.emit('error', { item, error: err });
return null;
}
})
);
this.results.push(...results.filter(r => r !== null));
}
this.emit('complete', {
completed: this.completed,
failed: this.failed,
results: this.results
});
return this.results;
}
chunk(array, size) {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
}
// Usage
const processor = new BatchProcessor(items, { concurrency: 10 });
processor.on('start', ({ total }) => console.log(`Starting ${total} items`));
processor.on('progress', ({ percent }) => console.log(`Progress: ${percent}%`));
processor.on('error', ({ item, error }) => console.error(`Error on ${item.id}`));
processor.on('complete', ({ completed, failed }) => {
console.log(`Done! ${completed} succeeded, ${failed} failed`);
});
await processor.process();