
Python SDK Guide
A complete, runnable example for integrating Zywrap's V1 offline data (including dynamic schemas) into your Python (Flask/FastAPI) application.
Option 1: Cloud Proxy (Quick Start)
Don't want to host the offline data yourself? Use our official PyPI package to connect directly to the Zywrap Cloud Proxy in seconds.
pip install zywrapfrom zywrap import Zywrap, ZywrapError
# Initialize the client
client = Zywrap("YOUR_API_KEY")
try:
response = client.execute(
model="openai-gpt-5.4",
wrapper_codes=["mc_seo_meta_titles_descriptions_base"],
variables={
"pageTopic": "AI project management software",
"pageIntent": "Compare software options",
"primaryKeyword": "agency project management",
"brandVoice": "clear and trustworthy"
}
)
print(response["data"])
except ZywrapError as e:
print(f"Failed to execute: {e}")Option 2: Advanced Local Database Sync
Self-host the wrapper data to ensure zero-latency lookups, complete offline capability, and highly customized API integrations.
Step 1: Download Your Data Bundle
Download the V1 ZIP file containing the highly compressed Tabular JSON data. Unzip the zywrap-data.json file from the bundle to use in the import script.
You must be logged in to download the SDK data bundle.
Step 2: Database Setup
Run this SQL. It features the relational use_cases table containing the dynamic schema_data.
-- Database Schema for Zywrap Offline SDK v1.0 (PostgreSQL)
CREATE TABLE "ai_models" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
"ordering" INT
);
CREATE TABLE "categories" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
"ordering" INT
);
CREATE TABLE "languages" (
"code" VARCHAR(10) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
"ordering" INT
);
CREATE TABLE "use_cases" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"description" TEXT,
"category_code" VARCHAR(255) REFERENCES categories(code) ON DELETE SET NULL,
"schema_data" JSONB,
"status" BOOLEAN DEFAULT TRUE,
"ordering" BIGINT
);
CREATE TABLE "wrappers" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"description" TEXT,
"use_case_code" VARCHAR(255) REFERENCES use_cases(code) ON DELETE SET NULL,
"featured" BOOLEAN DEFAULT FALSE,
"base" BOOLEAN DEFAULT FALSE,
"status" BOOLEAN DEFAULT TRUE,
"ordering" BIGINT
);
CREATE TABLE "block_templates" (
"type" VARCHAR(50) NOT NULL,
"code" VARCHAR(255) NOT NULL,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
PRIMARY KEY ("type", "code")
);
CREATE TABLE "settings" (
"setting_key" VARCHAR(255) PRIMARY KEY,
"setting_value" TEXT
);
CREATE TABLE "usage_logs" (
"id" BIGSERIAL PRIMARY KEY,
"trace_id" VARCHAR(255),
"wrapper_code" VARCHAR(255),
"model_code" VARCHAR(255),
"prompt_tokens" INT DEFAULT 0,
"completion_tokens" INT DEFAULT 0,
"total_tokens" INT DEFAULT 0,
"credits_used" BIGINT DEFAULT 0,
"latency_ms" INT DEFAULT 0,
"status" VARCHAR(50) DEFAULT 'success',
"error_message" TEXT,
"created_at" TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_usage_wrapper ON usage_logs(wrapper_code);
CREATE INDEX idx_usage_model ON usage_logs(model_code);
CREATE INDEX idx_use_case_cat ON use_cases(category_code);
CREATE INDEX idx_wrapper_uc ON wrappers(use_case_code);
Step 3: Database Connection
Save this as db.py. We recommend using psycopg2 for PostgreSQL.
# FILE: db.py
# Uses the 'psycopg2' library for PostgreSQL
# pip install psycopg2-binary
import psycopg2
import psycopg2.extras
import sys
# Replace with your actual database credentials
DB_SETTINGS = {
"dbname": "zywrap_db",
"user": "postgres",
"password": "password",
"host": "localhost",
"port": "5432"
}
def get_db_connection():
"""Establishes and returns a new database connection."""
try:
conn = psycopg2.connect(**DB_SETTINGS)
return conn
except psycopg2.OperationalError as e:
print(f"FATAL: Could not connect to the database.\n{e}", file=sys.stderr)
sys.exit(1)
Step 4: Import Tabular Data
This script parses the compressed V1 JSON and securely bulk-inserts it into your database.
# FILE: import.py
# USAGE: python import.py
# This script assumes you have 'zywrap-data.json' in the same directory.
import json
import sys
from db import get_db_connection
def extract_tabular(tabular_data):
"""Helper to expand tabular JSON data into dictionaries"""
if not tabular_data or not tabular_data.get('cols') or not tabular_data.get('data'):
return []
cols = tabular_data['cols']
return [dict(zip(cols, row)) for row in tabular_data['data']]
def main():
print("Starting lightning-fast v1.0 data import...")
try:
with open('zywrap-data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError:
print("FATAL: zywrap-data.json not found.", file=sys.stderr)
sys.exit(1)
conn = get_db_connection()
try:
with conn.cursor() as cur:
# 1. Clear existing data
print("Clearing tables...")
cur.execute("TRUNCATE wrappers, use_cases, categories, languages, block_templates, ai_models, settings RESTART IDENTITY CASCADE")
# psycopg2 autocommits by default only outside blocks, so we are in a transaction implicitly.
# 1. Import Categories
if 'categories' in data:
for c in extract_tabular(data['categories']):
cur.execute(
"INSERT INTO categories (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
(c['code'], c['name'], c.get('ordering', 99999))
)
print("Categories imported successfully.")
# 2. Import Use Cases
if 'useCases' in data:
for uc in extract_tabular(data['useCases']):
schema_json = json.dumps(uc['schema']) if uc.get('schema') else None
cur.execute(
"INSERT INTO use_cases (code, name, description, category_code, schema_data, status, ordering) VALUES (%s, %s, %s, %s, %s, TRUE, %s)",
(uc['code'], uc['name'], uc.get('desc'), uc.get('cat'), schema_json, uc.get('ordering', 999999999))
)
print("Use Cases imported successfully.")
# 3. Import Wrappers
if 'wrappers' in data:
for w in extract_tabular(data['wrappers']):
featured = bool(w.get('featured'))
base = bool(w.get('base'))
cur.execute(
"INSERT INTO wrappers (code, name, description, use_case_code, featured, base, status, ordering) VALUES (%s, %s, %s, %s, %s, %s, TRUE, %s)",
(w['code'], w['name'], w.get('desc'), w.get('usecase'), featured, base, w.get('ordering', 999999999))
)
print("Wrappers imported successfully.")
# 4. Import Languages
if 'languages' in data:
ord_counter = 1
for l in extract_tabular(data['languages']):
cur.execute(
"INSERT INTO languages (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
(l['code'], l['name'], ord_counter)
)
ord_counter += 1
print("Languages imported successfully.")
# 5. Import AI Models
if 'aiModels' in data:
for m in extract_tabular(data['aiModels']):
cur.execute(
"INSERT INTO ai_models (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
(m['code'], m['name'], m.get('ordering', 99999))
)
print("AI Models imported successfully.")
# 6. Import Block Templates
if 'templates' in data:
for type_name, tabular in data['templates'].items():
for tpl in extract_tabular(tabular):
cur.execute(
"INSERT INTO block_templates (type, code, name, status) VALUES (%s, %s, %s, TRUE)",
(type_name, tpl['code'], tpl['name'])
)
print("Block templates imported successfully.")
# 7. Store the version
if 'version' in data:
cur.execute(
"INSERT INTO settings (setting_key, setting_value) VALUES ('data_version', %s) ON CONFLICT (setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value",
(data['version'],)
)
print("Data version saved to settings table.")
conn.commit()
print(f"\n✅ v1.0 Import complete! Version: {data.get('version', 'N/A')}")
except Exception as e:
conn.rollback()
print(f"FATAL: Database error during import.\n{e}", file=sys.stderr)
finally:
conn.close()
if __name__ == "__main__":
main()
Step 5: The Dynamic API & Playground
This Flask server reads from your local DB, handles dynamic schemas, forwards the final request to the Zywrap API, and logs local usage automatically.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Zywrap V1 Offline Playground</title>
<style>
:root {
--primary: #2563eb; --primary-hover: #1d4ed8;
--bg: #f8fafc; --card-bg: #ffffff;
--text-main: #0f172a; --text-muted: #64748b;
--border: #e2e8f0; --radius: 0.75rem;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: system-ui, -apple-system, sans-serif; background: var(--bg); color: var(--text-main); line-height: 1.5; padding: 2rem; }
.container { max-width: 1200px; margin: 0 auto; display: grid; grid-template-columns: 1fr 1.8fr; gap: 2rem; }
@media (max-width: 768px) { .container { grid-template-columns: 1fr; } }
.card { background: var(--card-bg); border-radius: var(--radius); border: 1px solid var(--border); box-shadow: 0 1px 3px rgba(0,0,0,0.05); padding: 1.5rem; margin-bottom: 1.5rem; }
.card-title { font-size: 1.25rem; font-weight: 700; margin-bottom: 1rem; color: var(--text-main); }
.form-group { margin-bottom: 1rem; }
label { display: block; font-size: 0.875rem; font-weight: 600; margin-bottom: 0.35rem; color: #334155; }
.input, select, textarea {
width: 100%; padding: 0.625rem 0.75rem; border: 1px solid var(--border);
border-radius: 0.5rem; font-size: 0.875rem; background: #fff; transition: border-color 0.2s;
}
.input:focus, select:focus, textarea:focus { outline: none; border-color: var(--primary); box-shadow: 0 0 0 3px rgba(37,99,235,0.1); }
textarea { resize: vertical; min-height: 80px; }
.grid-2 { display: grid; grid-template-columns: 1fr 1fr; gap: 0.75rem; }
.btn {
width: 100%; background: var(--primary); color: white; border: none; padding: 0.875rem;
border-radius: 0.5rem; font-weight: 600; font-size: 1rem; cursor: pointer; transition: background 0.2s;
}
.btn:hover { background: var(--primary-hover); }
.btn:disabled { opacity: 0.7; cursor: not-allowed; }
.schema-section { background: #f8fafc; border: 1px solid var(--border); border-radius: 0.5rem; padding: 1rem; margin-bottom: 1rem; }
.schema-title { font-size: 0.875rem; font-weight: 700; border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; margin-bottom: 0.75rem; color: #475569; }
#output { background: #f1f5f9; padding: 1.25rem; border-radius: 0.5rem; font-family: ui-monospace, monospace; font-size: 0.875rem; white-space: pre-wrap; min-height: 300px; border: 1px solid var(--border); overflow-x: auto; }
</style>
</head>
<body>
<div class="container">
<div>
<div class="card">
<h2 class="card-title">Configuration</h2>
<div class="form-group">
<label>1. Category</label>
<select id="category"><option value="">Loading...</option></select>
</div>
<div class="form-group">
<label>2. AI Solution</label>
<select id="usecase" disabled><option value="">-- Select Category First --</option></select>
</div>
<div class="form-group">
<label>3. Configuration Style</label>
<select id="wrapper" disabled><option value="">-- Select Solution First --</option></select>
</div>
<div class="form-group">
<label>4. AI Model</label>
<select id="model"><option value="">Loading...</option></select>
</div>
<div class="form-group">
<label>5. Target Language</label>
<select id="language"><option value="">Loading...</option></select>
</div>
</div>
<div class="card">
<h2 class="card-title" style="font-size: 1rem;">Advanced Overrides</h2>
<div class="grid-2 overrides">
<div><label>Tone</label><select id="toneCode"><option value="">-- Default --</option></select></div>
<div><label>Style</label><select id="styleCode"><option value="">-- Default --</option></select></div>
<div><label>Formatting</label><select id="formatCode"><option value="">-- Default --</option></select></div>
<div><label>Complexity</label><select id="complexityCode"><option value="">-- Default --</option></select></div>
<div><label>Length</label><select id="lengthCode"><option value="">-- Default --</option></select></div>
<div><label>Audience</label><select id="audienceCode"><option value="">-- Default --</option></select></div>
<div><label>Goal</label><select id="responseGoalCode"><option value="">-- Default --</option></select></div>
<div><label>Output Type</label><select id="outputCode"><option value="">-- Default --</option></select></div>
</div>
</div>
</div>
<div>
<div class="card">
<div id="dynamic-schema-container"></div>
<div class="form-group">
<label id="prompt-label">Prompt / Additional Context</label>
<textarea id="prompt" class="input" placeholder="Type your request or additional instructions here..."></textarea>
</div>
<button id="run-button" class="btn">Generate Response</button>
</div>
<div class="card">
<h2 class="card-title">AI Response</h2>
<pre id="output">Output will appear here...</pre>
</div>
</div>
</div>
<script>
// Points to the Flask Backend
const API_ENDPOINT = 'http://localhost:5000/api';
const elements = {
category: document.getElementById('category'),
usecase: document.getElementById('usecase'), // 🚀 NEW
wrapper: document.getElementById('wrapper'),
model: document.getElementById('model'),
language: document.getElementById('language'),
schemaContainer: document.getElementById('dynamic-schema-container'),
prompt: document.getElementById('prompt'),
promptLabel: document.getElementById('prompt-label'),
runBtn: document.getElementById('run-button'),
output: document.getElementById('output')
};
async function fetchAPI(action, params = '') {
const res = await fetch(`${API_ENDPOINT}?action=${action}&${params}`);
return await res.json();
}
function populateSelect(el, data, placeholder = '-- Select --') {
el.innerHTML = `<option value="">${placeholder}</option>`;
data.forEach(item => {
const opt = document.createElement('option');
opt.value = item.code;
opt.textContent = item.name;
el.appendChild(opt);
});
el.disabled = false;
}
async function init() {
const [categories, models, langs, templates] = await Promise.all([
fetchAPI('get_categories'),
fetchAPI('get_ai_models'),
fetchAPI('get_languages'),
fetchAPI('get_block_templates')
]);
populateSelect(elements.category, categories);
populateSelect(elements.model, models, 'Default Model');
populateSelect(elements.language, langs, 'English (Default)');
const overrideMap = {
tones: 'toneCode', styles: 'styleCode', formattings: 'formatCode',
complexities: 'complexityCode', lengths: 'lengthCode', audienceLevels: 'audienceCode',
responseGoals: 'responseGoalCode', outputTypes: 'outputCode'
};
for (const [type, elId] of Object.entries(overrideMap)) {
if (templates[type]) populateSelect(document.getElementById(elId), templates[type], '-- Default --');
}
}
// 🚀 CASCADING LOGIC 1: Category -> Load Use Cases
elements.category.addEventListener('change', async () => {
if (!elements.category.value) {
elements.usecase.innerHTML = '<option value="">-- Select Category First --</option>';
elements.usecase.disabled = true;
elements.wrapper.innerHTML = '<option value="">-- Select Solution First --</option>';
elements.wrapper.disabled = true;
elements.schemaContainer.innerHTML = '';
return;
}
elements.usecase.innerHTML = '<option value="">Loading...</option>';
const useCases = await fetchAPI('get_use_cases', 'category=' + elements.category.value);
populateSelect(elements.usecase, useCases, '-- Select a Solution --');
// Reset wrapper
elements.wrapper.innerHTML = '<option value="">-- Select Solution First --</option>';
elements.wrapper.disabled = true;
elements.schemaContainer.innerHTML = '';
});
// 🚀 CASCADING LOGIC 2: Use Case -> Load Wrappers
elements.usecase.addEventListener('change', async () => {
if (!elements.usecase.value) {
elements.wrapper.innerHTML = '<option value="">-- Select Solution First --</option>';
elements.wrapper.disabled = true;
elements.schemaContainer.innerHTML = '';
return;
}
elements.wrapper.innerHTML = '<option value="">Loading...</option>';
let wrappers = await fetchAPI('get_wrappers', 'usecase=' + elements.usecase.value);
// Format the labels cleanly (Base vs Variation)
elements.wrapper.innerHTML = '<option value="">-- Select a Style --</option>';
let autoSelectCode = null; // Track which wrapper to auto-select
wrappers.forEach((w, index) => {
const opt = document.createElement('option');
opt.value = w.code;
const parts = w.name.split('—');
const displayName = w.base ? `✨ Base Template - ${parts[0].trim()}` : `↳ Variation: ${parts.length > 1 ? parts[1].trim() : w.name}`;
opt.textContent = displayName;
elements.wrapper.appendChild(opt);
// Identify the Base template, or fallback to the first available variation
if (w.base) {
autoSelectCode = w.code;
} else if (index === 0 && !autoSelectCode) {
autoSelectCode = w.code;
}
});
elements.wrapper.disabled = false;
// 🚀 NEW: Auto-select and trigger schema load
if (autoSelectCode) {
elements.wrapper.value = autoSelectCode;
elements.wrapper.dispatchEvent(new Event('change'));
}
});
// 🚀 CASCADING LOGIC 3: Wrapper -> Load Schema
elements.wrapper.addEventListener('change', async () => {
elements.schemaContainer.innerHTML = '';
elements.promptLabel.textContent = "Prompt / Additional Context";
if (!elements.wrapper.value) return;
const schema = await fetchAPI('get_schema', 'wrapper=' + elements.wrapper.value);
if (!schema || (!schema.req && !schema.opt)) return;
let html = '';
elements.promptLabel.textContent = "Additional Free-form Instructions";
const buildSection = (title, data) => {
if (!data || Object.keys(data).length === 0) return '';
let sectionHtml = `<div class="schema-section"><h3 class="schema-title">${title}</h3><div class="grid-2">`;
for (const [key, def] of Object.entries(data)) {
const isPlaceholder = def.p !== undefined ? def.p : false;
const defaultVal = def.d !== undefined ? def.d : '';
const placeholderAttr = isPlaceholder ? `placeholder="${defaultVal}"` : '';
const valueAttr = (!isPlaceholder && defaultVal) ? `value="${defaultVal}"` : '';
const label = key.replace(/([A-Z])/g, ' $1').replace(/^./, str => str.toUpperCase());
sectionHtml += `<div>
<label>${label}</label>
<input type="text" class="input schema-input" data-key="${key}" ${placeholderAttr} ${valueAttr}>
</div>`;
}
return sectionHtml + '</div></div>';
};
html += buildSection('Core Inputs', schema.req);
html += buildSection('Additional Context', schema.opt);
elements.schemaContainer.innerHTML = html;
});
// Execute
elements.runBtn.addEventListener('click', async () => {
if (!elements.wrapper.value) return alert("Please select a wrapper.");
elements.output.textContent = 'Executing...';
elements.runBtn.disabled = true;
elements.runBtn.textContent = 'Generating...';
// 1. Grab the exact user prompt
let finalPrompt = elements.prompt.value.trim();
// 2. Build the variables object natively
const variables = {};
document.querySelectorAll('.schema-input').forEach(input => {
const val = input.value.trim();
if (val !== '') {
variables[input.dataset.key] = val;
}
});
// 3. Build overrides
const overrides = {};
document.querySelectorAll('.overrides select').forEach(sel => {
if (sel.value) overrides[sel.id] = sel.value;
});
try {
const response = await fetch(API_ENDPOINT + '?action=execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: elements.model.value,
wrapperCode: elements.wrapper.value,
language: elements.language.value,
prompt: finalPrompt, // Only the raw text area
variables: variables, // Only the object (backend formats it!)
overrides: overrides
})
});
const text = await response.text();
if (!response.ok) {
let errMsg = text;
try { errMsg = JSON.parse(text).error || errMsg; } catch(e) {}
throw new Error(errMsg);
}
const data = JSON.parse(text);
elements.output.textContent = data.output || JSON.stringify(data, null, 2);
} catch (error) {
elements.output.textContent = 'Error: ' + error.message;
} finally {
elements.runBtn.disabled = false;
elements.runBtn.textContent = 'Generate Response';
}
});
init();
</script>
</body>
</html>
# FILE: app.py
# A simple Flask server to replicate the 'api.php' V1 playground backend.
#
# REQUIREMENTS:
# pip install flask flask-cors requests psycopg2-binary
#
# USAGE:
# 1. Save this as 'app.py'
# 2. Run: flask --app app run
# 3. Open 'playground.html' in your browser.
import json
import time
import requests
import sys
from db import get_db_connection
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
from psycopg2.extras import RealDictCursor
app = Flask(__name__)
CORS(app)
ZYWRAP_API_KEY = "YOUR_ZYWRAP_API_KEY"
ZYWRAP_PROXY_URL = 'https://api.zywrap.com/v1/proxy'
# --- Database Helper Functions ---
def get_categories(cur):
cur.execute("SELECT code, name FROM categories WHERE status = TRUE ORDER BY ordering ASC")
return cur.fetchall()
# 🚀 NEW: Fetch Solutions (Use Cases) by Category
def get_use_cases(cur, category_code):
cur.execute("""
SELECT code, name
FROM use_cases
WHERE category_code = %s AND status = TRUE
ORDER BY ordering ASC
""", (category_code,))
return cur.fetchall()
# 🚀 UPDATED: Fetch Wrappers (Styles) by Use Case
def get_wrappers_by_use_case(cur, use_case_code):
cur.execute("""
SELECT code, name, featured, base
FROM wrappers
WHERE use_case_code = %s AND status = TRUE
ORDER BY ordering ASC
""", (use_case_code,))
return cur.fetchall()
def get_schema_by_wrapper(cur, wrapper_code):
cur.execute("""
SELECT uc.schema_data
FROM use_cases uc
JOIN wrappers w ON w.use_case_code = uc.code
WHERE w.code = %s AND w.status = TRUE AND uc.status = TRUE
""", (wrapper_code,))
res = cur.fetchone()
return res['schema_data'] if res else None
def get_languages(cur):
cur.execute("SELECT code, name FROM languages WHERE status = TRUE ORDER BY ordering ASC")
return cur.fetchall()
def get_ai_models(cur):
cur.execute("SELECT code, name FROM ai_models WHERE status = TRUE ORDER BY ordering ASC")
return cur.fetchall()
def get_block_templates(cur):
cur.execute("SELECT type, code, name FROM block_templates WHERE status = TRUE ORDER BY type, name ASC")
results = cur.fetchall()
grouped = {}
for row in results:
t = row['type']
if t not in grouped: grouped[t] = []
grouped[t].append({'code': row['code'], 'name': row['name']})
return grouped
# ✅ HYBRID PROXY EXECUTION
def execute_zywrap_proxy(api_key, model, wrapper_code, prompt, language=None, variables={}, overrides={}):
payload_data = {
'model': model,
'wrapperCodes': [wrapper_code],
'prompt': prompt,
'variables': variables,
'source': 'python_sdk'
}
if language: payload_data['language'] = language
if overrides: payload_data.update(overrides)
clean_key = api_key.strip()
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {clean_key}',
'User-Agent': 'ZywrapPythonSDK/1.1'
}
try:
response = requests.post(ZYWRAP_PROXY_URL, json=payload_data, headers=headers, stream=True, timeout=300)
if response.status_code == 200:
final_json = None
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('data: '):
json_str = decoded_line[6:]
try:
data = json.loads(json_str)
if data and ('output' in data or 'error' in data):
final_json = data
except json.JSONDecodeError:
pass
if final_json:
status_code = 400 if 'error' in final_json else 200
return final_json, status_code
else:
return {'error': 'Stream parse failed'}, 500
else:
try: return response.json(), response.status_code
except ValueError: return {'error': response.text}, response.status_code
except requests.exceptions.RequestException as e:
error_msg = str(e)
if e.response is not None:
try: return e.response.json(), e.response.status_code
except: return {'error': e.response.text}, e.response.status_code
return {'error': error_msg}, 500
# --- API Router ---
@app.route('/api', methods=['GET', 'POST'])
def api_router():
conn = get_db_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if request.method == 'GET':
action = request.args.get('action')
if action == 'get_categories': return jsonify(get_categories(cur))
if action == 'get_use_cases': return jsonify(get_use_cases(cur, request.args.get('category')))
if action == 'get_wrappers': return jsonify(get_wrappers_by_use_case(cur, request.args.get('usecase')))
if action == 'get_languages': return jsonify(get_languages(cur))
if action == 'get_ai_models': return jsonify(get_ai_models(cur))
if action == 'get_block_templates': return jsonify(get_block_templates(cur))
if action == 'get_schema': return jsonify(get_schema_by_wrapper(cur, request.args.get('wrapper')))
if request.method == 'POST':
input_data = request.get_json()
action = request.args.get('action') or input_data.get('action')
if action == 'execute':
start_time = time.time()
result, status_code = execute_zywrap_proxy(
ZYWRAP_API_KEY,
input_data.get('model'),
input_data.get('wrapperCode', ''),
input_data.get('prompt', ''),
input_data.get('language'),
input_data.get('variables', {}),
input_data.get('overrides', {})
)
latency_ms = int((time.time() - start_time) * 1000)
try:
status_text = 'success' if status_code == 200 else 'error'
trace_id = result.get('id')
usage = result.get('usage', {})
p_tokens = usage.get('prompt_tokens', 0)
c_tokens = usage.get('completion_tokens', 0)
t_tokens = usage.get('total_tokens', 0)
credits_used = result.get('cost', {}).get('credits_used', 0)
error_message = result.get('error') if status_text == 'error' else None
if error_message:
error_msg_str = str(error_message)
error_message = error_msg_str[:255] + '...' if len(error_msg_str) > 255 else error_msg_str
cur.execute("""
INSERT INTO usage_logs
(trace_id, wrapper_code, model_code, prompt_tokens, completion_tokens, total_tokens, credits_used, latency_ms, status, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
trace_id, input_data.get('wrapperCode'), input_data.get('model', 'default'),
p_tokens, c_tokens, t_tokens, credits_used, latency_ms, status_text, error_message
))
conn.commit()
except Exception as log_err:
print(f"Failed to write to usage_logs: {log_err}", file=sys.stderr)
conn.rollback()
return jsonify(result), status_code
return jsonify({'error': 'Invalid action'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
if __name__ == '__main__':
print(f"Zywrap Python SDK Playground backend listening at http://localhost:5000")
app.run(debug=True, port=5000)
Step 6: Synchronize Data
Run this script on a cron job. It features automatic unzipping for FULL resets and ON CONFLICT optimized upserts for DELTA updates.
# FILE: zywrap-sync.py
# USAGE: python zywrap-sync.py
# REQUIREMENTS: pip install requests psycopg2-binary
import requests
import sys
import os
import zipfile
import psycopg2.extras
from db import get_db_connection
# --- CONFIGURATION ---
DEVELOPER_API_KEY = 'YOUR_ZYWRAP_API_KEY_HERE'
ZYWRAP_API_ENDPOINT = 'https://api.zywrap.com/v1/sdk/v1/sync'
# ---------------------
def get_current_version(cur):
cur.execute("SELECT setting_value FROM settings WHERE setting_key = 'data_version'")
result = cur.fetchone()
return result[0] if result else ''
def save_new_version(cur, version):
cur.execute(
"INSERT INTO settings (setting_key, setting_value) VALUES ('data_version', %s) ON CONFLICT (setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value",
(version,)
)
# --- HELPER FUNCTIONS ---
def upsert_batch(cur, table, rows, cols, pk='code'):
"""Optimized Upsert using Postgres ON CONFLICT"""
if not rows: return
col_names = ", ".join(cols)
updates = [f"{c} = EXCLUDED.{c}" for c in cols if c != pk and c != 'type']
update_clause = ", ".join(updates)
conflict_target = f"({pk})" if pk != 'compound_template' else "(type, code)"
query = f"""
INSERT INTO {table} ({col_names}) VALUES %s
ON CONFLICT {conflict_target} DO UPDATE SET {update_clause}
"""
try:
psycopg2.extras.execute_values(cur, query, rows, page_size=1000)
print(f" [+] Upserted {len(rows)} records into '{table}'.")
except Exception as e:
print(f" [!] Error upserting {table}: {e}")
def delete_batch(cur, table, ids, pk='code'):
if not ids: return
query = f"DELETE FROM {table} WHERE {pk} = ANY(%s)"
try:
cur.execute(query, (list(ids),))
print(f" [-] Deleted {len(ids)} records from '{table}'.")
except Exception as e:
print(f" [!] Error deleting from {table}: {e}")
# --- MAIN LOGIC ---
def main():
print("--- 🚀 Starting Zywrap V1 Sync ---")
conn = get_db_connection()
try:
with conn.cursor() as cur:
current_version = get_current_version(cur)
print(f"🔹 Local Version: {current_version or 'None'}")
# Commit immediately to release the read-lock on the settings table!
# Without this, import.py will deadlock when trying to TRUNCATE.
conn.commit()
# 1. Fetch update info
headers = {'Authorization': f'Bearer {DEVELOPER_API_KEY}', 'Accept': 'application/json'}
params = {'fromVersion': current_version}
try:
response = requests.get(ZYWRAP_API_ENDPOINT, headers=headers, params=params, verify=False)
response.raise_for_status()
except Exception as e:
print(f"❌ API Error: {e}")
return
patch = response.json()
mode = patch.get('mode', 'UNKNOWN')
print(f"🔹 Sync Mode: {mode}")
# --- SCENARIO A: FULL RESET ---
if mode == 'FULL_RESET':
zip_path = 'zywrap-data.zip'
download_url = patch['wrappers']['downloadUrl']
print(f"⬇️ Attempting automatic download from Zywrap...")
dl = requests.get(download_url, headers=headers, stream=True, verify=False)
if dl.status_code == 200:
with open(zip_path, 'wb') as f:
for chunk in dl.iter_content(chunk_size=8192): f.write(chunk)
mb_size = round(os.path.getsize(zip_path) / 1024 / 1024, 2)
print(f"✅ Data bundle downloaded successfully ({mb_size} MB).")
try:
print("📦 Attempting auto-unzip...")
with zipfile.ZipFile(zip_path, 'r') as z:
z.extractall('.')
print("✅ Auto-unzip successful. Running import script...")
os.remove(zip_path)
import importlib.util
spec = importlib.util.spec_from_file_location("import_script", "import.py")
import_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(import_module)
import_module.main()
except Exception as z_err:
print("⚠️ Failed to auto-unzip (Check directory permissions).")
print("\n👉 ACTION REQUIRED:")
print(f" 1. Please manually unzip '{zip_path}' in this folder.")
print(" 2. Then run: python import.py")
else:
print(f"❌ Automatic download failed. HTTP Status: {dl.status_code}")
if os.path.exists(zip_path): os.remove(zip_path)
# --- SCENARIO B: DELTA UPDATE ---
elif mode == 'DELTA_UPDATE':
meta = patch.get('metadata', {})
# Categories
rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('position') or r.get('displayOrder') or r.get('ordering')) for r in meta.get('categories', [])]
upsert_batch(cur, 'categories', rows, ['code', 'name', 'status', 'ordering'])
# Languages
rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('ordering')) for r in meta.get('languages', [])]
upsert_batch(cur, 'languages', rows, ['code', 'name', 'status', 'ordering'])
# AI Models
rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('displayOrder') or r.get('ordering')) for r in meta.get('aiModels', [])]
upsert_batch(cur, 'ai_models', rows, ['code', 'name', 'status', 'ordering'])
# Templates
rows = []
for type_name, items in meta.get('templates', {}).items():
for i in items:
rows.append((type_name, i['code'], i.get('label') or i.get('name'), bool(i.get('status', True))))
upsert_batch(cur, 'block_templates', rows, ['type', 'code', 'name', 'status'], pk='compound_template')
# Use Cases
upserts = patch.get('useCases', {}).get('upserts', [])
if upserts:
rows = []
for uc in upserts:
schema_str = json.dumps(uc['schema']) if uc.get('schema') else None
rows.append((uc['code'], uc['name'], uc.get('description'), uc.get('categoryCode'), schema_str, bool(uc.get('status', True)), uc.get('displayOrder') or uc.get('ordering')))
upsert_batch(cur, 'use_cases', rows, ['code', 'name', 'description', 'category_code', 'schema_data', 'status', 'ordering'])
# Wrappers
upserts = patch.get('wrappers', {}).get('upserts', [])
if upserts:
rows = []
for w in upserts:
rows.append((w['code'], w['name'], w.get('description'), w.get('useCaseCode') or w.get('categoryCode'), bool(w.get('featured') or w.get('isFeatured')), bool(w.get('base') or w.get('isBaseWrapper')), bool(w.get('status', True)), w.get('displayOrder') or w.get('ordering')))
upsert_batch(cur, 'wrappers', rows, ['code', 'name', 'description', 'use_case_code', 'featured', 'base', 'status', 'ordering'])
# Deletes
delete_batch(cur, 'wrappers', patch.get('wrappers', {}).get('deletes', []))
delete_batch(cur, 'use_cases', patch.get('useCases', {}).get('deletes', []))
# Version
if patch.get('newVersion'):
save_new_version(cur, patch['newVersion'])
conn.commit()
print("✅ Delta Sync Complete.")
else:
print("✅ No updates needed.")
except Exception as e:
if not conn.closed:
conn.rollback()
print(f"FATAL: Sync Failed: {e}", file=sys.stderr)
finally:
if not conn.closed:
conn.close()
if __name__ == "__main__":
main()
Programmatically Download
Stream the download directly to disk to prevent memory issues.
# FILE: download_bundle.py
import requests
import sys
import os
# --- CONFIGURATION ---
ZYWRAP_API_KEY = 'YOUR_API_KEY_HERE'
API_ENDPOINT = 'https://api.zywrap.com/v1/sdk/v1/download'
OUTPUT_FILE = 'zywrap-data.zip'
# ---------------------
def download_sdk_bundle():
print("Downloading latest V1 wrapper data from Zywrap...")
if not ZYWRAP_API_KEY or 'YOUR_API_KEY_HERE' in ZYWRAP_API_KEY:
print("FATAL: Please replace 'YOUR_API_KEY_HERE' with your actual Zywrap API key.", file=sys.stderr)
sys.exit(1)
headers = {'Authorization': f'Bearer {ZYWRAP_API_KEY}'}
try:
# stream=True ensures we don't load the entire zip into RAM at once
response = requests.get(API_ENDPOINT, headers=headers, stream=True, timeout=300, verify=False)
response.raise_for_status()
# Write directly to disk
with open(OUTPUT_FILE, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Sync complete. Data saved to {OUTPUT_FILE}.")
print(f"Run 'unzip {OUTPUT_FILE}' to extract the 'zywrap-data.json' file, then run 'python import.py'.")
except requests.exceptions.HTTPError as e:
if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE)
print(f"FATAL: API request failed with status code {e.response.status_code}.", file=sys.stderr)
sys.exit(1)
except Exception as e:
if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE)
print(f"FATAL: An error occurred: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
download_sdk_bundle()

