为开发者和企业提供完整的任务处理解决方案
创建和管理多个应用,每个应用拥有独立的API密钥和任务队列。
支持优先级排序、重试机制、过期控制的智能任务队列管理。
简单易用的API接口,支持任务创建、状态查询、进度更新等操作。
实时监控任务状态、进度和处理器状态,提供完整的管理界面。
支持任务完成后的回调通知,及时获取处理结果。
基于应用密钥的安全认证机制,保护您的任务数据安全。
API地址: http://api.meiyim.cn/api.php
请求格式: JSON
响应格式: JSON
认证方式: 应用密钥(app_key)
接口: POST /api.php?action=create_task
请求参数:
{
"app_key": "你的应用密钥",
"task_type": "video_extract",
"task_data": {
"video_url": "https://example.com/video.mp4",
"output_format": "srt"
},
"priority": 3,
"callback_url": "https://your-site.com/callback"
}
响应示例:
{
"code": 1,
"msg": "任务创建成功",
"data": {
"task_id": 123,
"status": "pending"
}
}
接口: POST /api.php?action=get_next_task
请求参数:
{
"app_key": "你的应用密钥",
"worker_id": "worker_001",
"task_types": ["video_extract", "image_enhance"]
}
响应示例:
{
"code": 1,
"msg": "获取任务成功",
"data": {
"id": 123,
"task_type": "video_extract",
"task_data": {
"video_url": "https://example.com/video.mp4",
"output_format": "srt"
},
"priority": 3,
"max_retries": 3
}
}
接口: POST /api.php?action=update_progress
请求参数:
{
"app_key": "你的应用密钥",
"task_id": 123,
"progress": 50,
"message": "正在处理中..."
}
接口: POST /api.php?action=complete_task
请求参数:
{
"app_key": "你的应用密钥",
"task_id": 123,
"result": {
"output_file": "https://example.com/output.srt",
"duration": 120,
"subtitle_count": 45
}
}
接口: POST /api.php?action=fail_task
请求参数:
{
"app_key": "你的应用密钥",
"task_id": 123,
"error_message": "视频文件下载失败",
"allow_retry": true
}
import requests
import json
import time
class CloudTaskClient:
def __init__(self, api_url, app_key):
self.api_url = api_url
self.app_key = app_key
def create_task(self, task_type, task_data, priority=3):
"""创建任务"""
data = {
'app_key': self.app_key,
'task_type': task_type,
'task_data': task_data,
'priority': priority
}
response = requests.post(
f"{self.api_url}?action=create_task",
json=data
)
return response.json()
def get_next_task(self, worker_id, task_types=None):
"""获取下一个待处理任务"""
data = {
'app_key': self.app_key,
'worker_id': worker_id,
'task_types': task_types or []
}
response = requests.post(
f"{self.api_url}?action=get_next_task",
json=data
)
return response.json()
def update_progress(self, task_id, progress, message=""):
"""更新任务进度"""
data = {
'app_key': self.app_key,
'task_id': task_id,
'progress': progress,
'message': message
}
response = requests.post(
f"{self.api_url}?action=update_progress",
json=data
)
return response.json()
def complete_task(self, task_id, result):
"""完成任务"""
data = {
'app_key': self.app_key,
'task_id': task_id,
'result': result
}
response = requests.post(
f"{self.api_url}?action=complete_task",
json=data
)
return response.json()
# 使用示例
client = CloudTaskClient(
api_url="http://api.meiyim.cn/api.php",
app_key="你的应用密钥"
)
# 创建任务
task_result = client.create_task(
task_type="video_extract",
task_data={
"video_url": "https://example.com/video.mp4",
"output_format": "srt"
}
)
print("任务创建结果:", task_result)