Switch To Other
Python script that physically switches the active AI engine in the Antigravity UI using browser automation.
import urllib.request, json, asyncio, sys
import socket
try:
import websockets
except ImportError:
import subprocess
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'websockets'])
import websockets
BASE_PORT = 9000
PORT_RANGE = 3
TRAMPOLINE_MODEL = "Gemini 3.5 Flash"
def find_active_cdp_port():
ports = [BASE_PORT]
for port in range(BASE_PORT - PORT_RANGE, BASE_PORT + PORT_RANGE + 1):
if port != BASE_PORT:
ports.append(port)
for port in ports:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.3)
s.connect(('127.0.0.1', port))
return port
except Exception:
pass
return None
async def get_chat_page(port):
try:
data = json.loads(urllib.request.urlopen(f'http://127.0.0.1:{port}/json/list', timeout=3).read())
except Exception:
print(f"FAILED: Cannot reach CDP on port {port}")
sys.exit(1)
candidates = []
for t in data:
if t.get('type') == 'page' and 'workbench.html' in t.get('url', ''):
if 'jetski' not in t.get('url', ''):
candidates.append(t)
if not candidates:
print(f"FAILED: No active chat page found on port {port}")
sys.exit(1)
if len(candidates) == 1:
return candidates[0]
# Multiple workbench windows open — find the focused one
async def _check_focus(candidate):
try:
async with websockets.connect(candidate['webSocketDebuggerUrl']) as ws:
await ws.send(json.dumps({
'id': 999, 'method': 'Runtime.evaluate',
'params': {'expression': 'document.hasFocus()', 'returnByValue': True}
}))
resp = json.loads(await asyncio.wait_for(ws.recv(), timeout=5))
return resp.get('result', {}).get('result', {}).get('value', False)
except Exception:
return False
for c in candidates:
if await _check_focus(c):
return c
# Fallback to first candidate if none has focus
return candidates[0]
# ─── Reusable helpers ───────────────────────────────────────────────
JS_OPEN_DROPDOWN = """
(function() {
var btns = document.querySelectorAll('*[role="button"], button');
var targetBtn = null;
for (var i = 0; i < btns.length; i++) {
var txt = (btns[i].textContent || "").trim();
var popup = btns[i].getAttribute('aria-haspopup');
if (popup && popup !== 'false') {
if (txt.includes('Gemini') || txt.includes('Claude') || txt.includes('GPT') || txt.includes('Flash') || txt.includes('Pro') || txt.includes('Opus') || txt.includes('Sonnet')) {
targetBtn = btns[i];
break;
}
}
}
if (targetBtn) {
targetBtn.click();
return true;
}
// Fallback to clicking all popup elements
for (var i = 0; i < btns.length; i++) {
if (btns[i].getAttribute('aria-haspopup')) {
btns[i].click();
}
}
return true;
})()
"""
def js_select_model(model_name):
return f"""
(function() {{
var targetModel = {json.dumps(model_name)};
// Strategy 1: Target popover-item buttons (new Antigravity UI)
// These are the actual dropdown option buttons inside the model picker popover
var popoverItems = document.querySelectorAll('button[class*="popover-item"]');
for (var i = 0; i < popoverItems.length; i++) {{
// Check the inner span text, not full textContent (which may include badge text like "Fast")
var innerSpans = popoverItems[i].querySelectorAll('span.text-xs span');
for (var j = 0; j < innerSpans.length; j++) {{
if (innerSpans[j].textContent.trim() === targetModel) {{
var el = popoverItems[i];
['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
el.dispatchEvent(new MouseEvent(eventType, {{
bubbles: true,
cancelable: true,
view: window
}}));
}});
return JSON.stringify({{success: true, strategy: 'popover-item-exact'}});
}}
}}
// Fallback: check full textContent includes
if (popoverItems[i].textContent.trim().includes(targetModel)) {{
var el = popoverItems[i];
['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
el.dispatchEvent(new MouseEvent(eventType, {{
bubbles: true,
cancelable: true,
view: window
}}));
}});
return JSON.stringify({{success: true, strategy: 'popover-item-includes'}});
}}
}}
// Strategy 2: Target role=option or role=menuitem (legacy UI)
var roleItems = document.querySelectorAll('[role="option"], [role="menuitem"], [role="menuitemradio"]');
for (var i = 0; i < roleItems.length; i++) {{
if (roleItems[i].textContent.trim().includes(targetModel)) {{
var el = roleItems[i];
['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
el.dispatchEvent(new MouseEvent(eventType, {{
bubbles: true,
cancelable: true,
view: window
}}));
}});
return JSON.stringify({{success: true, strategy: 'role-item'}});
}}
}}
// Strategy 3: Target data-autofocus buttons (another way to identify dropdown items)
var autofocusItems = document.querySelectorAll('button[data-autofocus]');
for (var i = 0; i < autofocusItems.length; i++) {{
if (autofocusItems[i].textContent.trim().includes(targetModel)) {{
var el = autofocusItems[i];
['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
el.dispatchEvent(new MouseEvent(eventType, {{
bubbles: true,
cancelable: true,
view: window
}}));
}});
return JSON.stringify({{success: true, strategy: 'data-autofocus'}});
}}
}}
// Collect available items for error reporting
var available = [];
popoverItems.forEach(function(el) {{ if (el.textContent.trim()) available.push('popover: ' + el.textContent.trim().substring(0, 80)); }});
roleItems.forEach(function(el) {{ if (el.textContent.trim()) available.push('role: ' + el.textContent.trim().substring(0, 80)); }});
autofocusItems.forEach(function(el) {{ if (el.textContent.trim()) available.push('autofocus: ' + el.textContent.trim().substring(0, 80)); }});
return JSON.stringify({{error: 'Target model not found in dropdown', available: available}});
}})()
"""
JS_DISMISS_DIALOGS = """
(function() {
var escapeEvent = new KeyboardEvent('keydown', { key: 'Escape', bubbles: true, cancelable: true });
document.body.dispatchEvent(escapeEvent);
setTimeout(() => document.body.dispatchEvent(escapeEvent), 50);
})()
"""
def js_send_message(text):
return f"""
(async function() {{
var editors = Array.from(document.querySelectorAll('textarea, div[contenteditable="true"]'))
.filter(e => !e.className.includes('xterm') && !e.classList.contains('xterm-helper-textarea') &&
(e.closest('.pane-composite-part') != null || e.closest('.antigravity-agent-side-panel') != null || e.closest('.auxiliarybar') != null));
var editor = editors[0];
if (!editor) {{
var all = document.querySelectorAll('textarea, div[contenteditable="true"]');
editor = all[all.length - 1]; // Fallback to last one
}}
if (editor) {{
editor.focus();
document.execCommand('selectAll', false, null);
document.execCommand('insertText', false, {json.dumps(text)});
await new Promise(r => setTimeout(r, 500));
var enterEvent = new KeyboardEvent('keydown', {{ key: 'Enter', code: 'Enter', keyCode: 13, which: 13, bubbles: true, cancelable: true }});
editor.dispatchEvent(enterEvent);
// Fallback: look for a send button
await new Promise(r => setTimeout(r, 500));
var sendBtns = document.querySelectorAll('button[aria-label="Send"], button[title*="Send"], .lucide-send, button[aria-label="Submit"]');
for (var j = 0; j < sendBtns.length; j++) {{
var btn = sendBtns[j].closest('button') || sendBtns[j];
btn.click();
}}
await new Promise(r => setTimeout(r, 500));
}}
}})()
"""
# ─── Core switch logic ─────────────────────────────────────────────
async def select_model_in_dropdown(evaluate, model_name):
"""Opens the model dropdown and clicks the target model. Returns True on success."""
await evaluate(JS_OPEN_DROPDOWN)
await asyncio.sleep(0.5)
result = await evaluate(js_select_model(model_name))
result_data = json.loads(result)
if not result_data.get('success'):
print(f"FAILED selecting {model_name}: {result_data.get('error')}. Available: {result_data.get('available')}")
return False
print(f"Selected model: {model_name}")
await asyncio.sleep(1)
# Dismiss any stray dialogs
await evaluate(JS_DISMISS_DIALOGS)
await asyncio.sleep(0.5)
return True
async def switch_model(target_model, port, use_trampoline=False):
page = await get_chat_page(port)
print(f"Connected to chat page on port {port}. Target model: {target_model}. Trampoline: {use_trampoline}")
async with websockets.connect(page['webSocketDebuggerUrl']) as ws:
msg_id = 1
async def evaluate(js):
nonlocal msg_id
current_id = msg_id
msg_id += 1
await ws.send(json.dumps({
'id': current_id, 'method': 'Runtime.evaluate',
'params': {'expression': js, 'returnByValue': True, 'userGesture': True, 'awaitPromise': True}
}))
while True:
resp = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
if resp.get('id') == current_id:
result = resp.get('result', {}).get('result', {})
return result.get('value')
# === PAUSE THE AUTO-CLICKER ===
await evaluate("window.__switchHandoverActive = true")
print("Auto-clicker paused via __switchHandoverActive flag. Waiting 15 seconds for agent to finish generating before switching...")
try:
# Wait for the agent to finish its turn so the UI becomes IDLE and unlocks the model selector
await asyncio.sleep(15)
# ═══════════════════════════════════════════════════════════
# TRAMPOLINE HOP (only if --trampoline flag is set)
# ═══════════════════════════════════════════════════════════
if use_trampoline:
print(f"TRAMPOLINE: Switching to {TRAMPOLINE_MODEL} first...")
if not await select_model_in_dropdown(evaluate, TRAMPOLINE_MODEL):
print("FAILED: Could not select trampoline model. Aborting.")
sys.exit(1)
print("Sending '.' to Flash to commit the model change...")
await evaluate(js_send_message(".\\n"))
print("Waiting 20 seconds for Flash to finish responding...")
await asyncio.sleep(20)
# ═══════════════════════════════════════════════════════════
# ACTUAL HOP: Select the real target model.
# ═══════════════════════════════════════════════════════════
print(f"Selecting target model: {target_model}...")
if not await select_model_in_dropdown(evaluate, target_model):
print(f"FAILED: Could not select target model {target_model}. Aborting.")
sys.exit(1)
await asyncio.sleep(1)
# Send the auto-resume message to the target engine
print(f"SUCCESS: Switched to {target_model}. Sending auto-resume message...")
await evaluate(js_send_message(
"Auto-resuming workflow under new engine. Verify the switch by checking for USER_SETTINGS_CHANGE in your metadata, then continue with your tasks\\n"
))
finally:
# === UNPAUSE THE AUTO-CLICKER ===
await evaluate("window.__switchHandoverActive = false")
print("Auto-clicker resumed via __switchHandoverActive = false")
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python switch_to_other.py <TARGET_MODEL> [--trampoline] or python switch_to_other.py --check")
sys.exit(1)
if sys.argv[1] == '--check':
port = find_active_cdp_port()
if port:
print(f"ACTIVE_CDP_PORT={port}")
sys.exit(0)
else:
print("ACTIVE_CDP_PORT=None")
sys.exit(1)
target = sys.argv[1]
trampoline = '--trampoline' in sys.argv
port = find_active_cdp_port()
if not port:
print("FAILED: No active CDP debugging port found on localhost (ports 8997-9003 are offline).")
sys.exit(1)
asyncio.run(switch_model(target, port, use_trampoline=trampoline))