← Back to Workflow
Internal

Switch To Other

Python script that physically switches the active AI engine in the Antigravity UI using browser automation.

import urllib.request, json, asyncio, sys
import socket

try:
    import websockets
except ImportError:
    import subprocess
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'websockets'])
    import websockets

BASE_PORT = 9000
PORT_RANGE = 3
TRAMPOLINE_MODEL = "Gemini 3.5 Flash"

def find_active_cdp_port():
    ports = [BASE_PORT]
    for port in range(BASE_PORT - PORT_RANGE, BASE_PORT + PORT_RANGE + 1):
        if port != BASE_PORT:
            ports.append(port)
            
    for port in ports:
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(0.3)
                s.connect(('127.0.0.1', port))
                return port
        except Exception:
            pass
    return None

async def get_chat_page(port):
    try:
        data = json.loads(urllib.request.urlopen(f'http://127.0.0.1:{port}/json/list', timeout=3).read())
    except Exception:
        print(f"FAILED: Cannot reach CDP on port {port}")
        sys.exit(1)
        
    candidates = []
    for t in data:
        if t.get('type') == 'page' and 'workbench.html' in t.get('url', ''):
            if 'jetski' not in t.get('url', ''):
                candidates.append(t)
                
    if not candidates:
        print(f"FAILED: No active chat page found on port {port}")
        sys.exit(1)

    if len(candidates) == 1:
        return candidates[0]

    # Multiple workbench windows open — find the focused one
    async def _check_focus(candidate):
        try:
            async with websockets.connect(candidate['webSocketDebuggerUrl']) as ws:
                await ws.send(json.dumps({
                    'id': 999, 'method': 'Runtime.evaluate',
                    'params': {'expression': 'document.hasFocus()', 'returnByValue': True}
                }))
                resp = json.loads(await asyncio.wait_for(ws.recv(), timeout=5))
                return resp.get('result', {}).get('result', {}).get('value', False)
        except Exception:
            return False

    for c in candidates:
        if await _check_focus(c):
            return c

    # Fallback to first candidate if none has focus
    return candidates[0]


# ─── Reusable helpers ───────────────────────────────────────────────

JS_OPEN_DROPDOWN = """
(function() {
    var btns = document.querySelectorAll('*[role="button"], button');
    var targetBtn = null;
    for (var i = 0; i < btns.length; i++) {
        var txt = (btns[i].textContent || "").trim();
        var popup = btns[i].getAttribute('aria-haspopup');
        if (popup && popup !== 'false') {
            if (txt.includes('Gemini') || txt.includes('Claude') || txt.includes('GPT') || txt.includes('Flash') || txt.includes('Pro') || txt.includes('Opus') || txt.includes('Sonnet')) {
                targetBtn = btns[i];
                break;
            }
        }
    }
    if (targetBtn) {
        targetBtn.click();
        return true;
    }
    // Fallback to clicking all popup elements
    for (var i = 0; i < btns.length; i++) {
        if (btns[i].getAttribute('aria-haspopup')) {
            btns[i].click();
        }
    }
    return true;
})()
"""

def js_select_model(model_name):
    return f"""
    (function() {{
        var targetModel = {json.dumps(model_name)};
        
        // Strategy 1: Target popover-item buttons (new Antigravity UI)
        // These are the actual dropdown option buttons inside the model picker popover
        var popoverItems = document.querySelectorAll('button[class*="popover-item"]');
        for (var i = 0; i < popoverItems.length; i++) {{
            // Check the inner span text, not full textContent (which may include badge text like "Fast")
            var innerSpans = popoverItems[i].querySelectorAll('span.text-xs span');
            for (var j = 0; j < innerSpans.length; j++) {{
                if (innerSpans[j].textContent.trim() === targetModel) {{
                    var el = popoverItems[i];
                    ['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
                        el.dispatchEvent(new MouseEvent(eventType, {{
                            bubbles: true,
                            cancelable: true,
                            view: window
                        }}));
                    }});
                    return JSON.stringify({{success: true, strategy: 'popover-item-exact'}});
                }}
            }}
            // Fallback: check full textContent includes
            if (popoverItems[i].textContent.trim().includes(targetModel)) {{
                var el = popoverItems[i];
                ['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
                    el.dispatchEvent(new MouseEvent(eventType, {{
                        bubbles: true,
                        cancelable: true,
                        view: window
                    }}));
                }});
                return JSON.stringify({{success: true, strategy: 'popover-item-includes'}});
            }}
        }}
        
        // Strategy 2: Target role=option or role=menuitem (legacy UI)
        var roleItems = document.querySelectorAll('[role="option"], [role="menuitem"], [role="menuitemradio"]');
        for (var i = 0; i < roleItems.length; i++) {{
            if (roleItems[i].textContent.trim().includes(targetModel)) {{
                var el = roleItems[i];
                ['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
                    el.dispatchEvent(new MouseEvent(eventType, {{
                        bubbles: true,
                        cancelable: true,
                        view: window
                    }}));
                }});
                return JSON.stringify({{success: true, strategy: 'role-item'}});
            }}
        }}
        
        // Strategy 3: Target data-autofocus buttons (another way to identify dropdown items)
        var autofocusItems = document.querySelectorAll('button[data-autofocus]');
        for (var i = 0; i < autofocusItems.length; i++) {{
            if (autofocusItems[i].textContent.trim().includes(targetModel)) {{
                var el = autofocusItems[i];
                ['pointerdown', 'mousedown', 'pointerup', 'mouseup', 'click'].forEach(eventType => {{
                    el.dispatchEvent(new MouseEvent(eventType, {{
                        bubbles: true,
                        cancelable: true,
                        view: window
                    }}));
                }});
                return JSON.stringify({{success: true, strategy: 'data-autofocus'}});
            }}
        }}
        
        // Collect available items for error reporting
        var available = [];
        popoverItems.forEach(function(el) {{ if (el.textContent.trim()) available.push('popover: ' + el.textContent.trim().substring(0, 80)); }});
        roleItems.forEach(function(el) {{ if (el.textContent.trim()) available.push('role: ' + el.textContent.trim().substring(0, 80)); }});
        autofocusItems.forEach(function(el) {{ if (el.textContent.trim()) available.push('autofocus: ' + el.textContent.trim().substring(0, 80)); }});
        return JSON.stringify({{error: 'Target model not found in dropdown', available: available}});
    }})()
    """

JS_DISMISS_DIALOGS = """
(function() {
    var escapeEvent = new KeyboardEvent('keydown', { key: 'Escape', bubbles: true, cancelable: true });
    document.body.dispatchEvent(escapeEvent);
    setTimeout(() => document.body.dispatchEvent(escapeEvent), 50);
})()
"""

def js_send_message(text):
    return f"""
    (async function() {{
        var editors = Array.from(document.querySelectorAll('textarea, div[contenteditable="true"]'))
            .filter(e => !e.className.includes('xterm') && !e.classList.contains('xterm-helper-textarea') && 
                         (e.closest('.pane-composite-part') != null || e.closest('.antigravity-agent-side-panel') != null || e.closest('.auxiliarybar') != null));
        
        var editor = editors[0];
        if (!editor) {{
            var all = document.querySelectorAll('textarea, div[contenteditable="true"]');
            editor = all[all.length - 1]; // Fallback to last one
        }}
        
        if (editor) {{
            editor.focus();
            document.execCommand('selectAll', false, null);
            document.execCommand('insertText', false, {json.dumps(text)});
            
            await new Promise(r => setTimeout(r, 500));
            var enterEvent = new KeyboardEvent('keydown', {{ key: 'Enter', code: 'Enter', keyCode: 13, which: 13, bubbles: true, cancelable: true }});
            editor.dispatchEvent(enterEvent);
            
            // Fallback: look for a send button
            await new Promise(r => setTimeout(r, 500));
            var sendBtns = document.querySelectorAll('button[aria-label="Send"], button[title*="Send"], .lucide-send, button[aria-label="Submit"]');
            for (var j = 0; j < sendBtns.length; j++) {{
                var btn = sendBtns[j].closest('button') || sendBtns[j];
                btn.click();
            }}
            
            await new Promise(r => setTimeout(r, 500));
        }}
    }})()
    """


# ─── Core switch logic ─────────────────────────────────────────────

async def select_model_in_dropdown(evaluate, model_name):
    """Opens the model dropdown and clicks the target model. Returns True on success."""
    await evaluate(JS_OPEN_DROPDOWN)
    await asyncio.sleep(0.5)

    result = await evaluate(js_select_model(model_name))
    result_data = json.loads(result)
    if not result_data.get('success'):
        print(f"FAILED selecting {model_name}: {result_data.get('error')}. Available: {result_data.get('available')}")
        return False

    print(f"Selected model: {model_name}")
    await asyncio.sleep(1)

    # Dismiss any stray dialogs
    await evaluate(JS_DISMISS_DIALOGS)
    await asyncio.sleep(0.5)
    return True


async def switch_model(target_model, port, use_trampoline=False):
    page = await get_chat_page(port)
    print(f"Connected to chat page on port {port}. Target model: {target_model}. Trampoline: {use_trampoline}")

    async with websockets.connect(page['webSocketDebuggerUrl']) as ws:
        msg_id = 1
        async def evaluate(js):
            nonlocal msg_id
            current_id = msg_id
            msg_id += 1
            await ws.send(json.dumps({
                'id': current_id, 'method': 'Runtime.evaluate',
                'params': {'expression': js, 'returnByValue': True, 'userGesture': True, 'awaitPromise': True}
            }))
            while True:
                resp = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
                if resp.get('id') == current_id:
                    result = resp.get('result', {}).get('result', {})
                    return result.get('value')

        # === PAUSE THE AUTO-CLICKER ===
        await evaluate("window.__switchHandoverActive = true")
        print("Auto-clicker paused via __switchHandoverActive flag. Waiting 15 seconds for agent to finish generating before switching...")
        
        try:
            # Wait for the agent to finish its turn so the UI becomes IDLE and unlocks the model selector
            await asyncio.sleep(15)

            # ═══════════════════════════════════════════════════════════
            # TRAMPOLINE HOP (only if --trampoline flag is set)
            # ═══════════════════════════════════════════════════════════
            if use_trampoline:
                print(f"TRAMPOLINE: Switching to {TRAMPOLINE_MODEL} first...")
                
                if not await select_model_in_dropdown(evaluate, TRAMPOLINE_MODEL):
                    print("FAILED: Could not select trampoline model. Aborting.")
                    sys.exit(1)

                print("Sending '.' to Flash to commit the model change...")
                await evaluate(js_send_message(".\\n"))

                print("Waiting 20 seconds for Flash to finish responding...")
                await asyncio.sleep(20)

            # ═══════════════════════════════════════════════════════════
            # ACTUAL HOP: Select the real target model.
            # ═══════════════════════════════════════════════════════════
            print(f"Selecting target model: {target_model}...")
            
            if not await select_model_in_dropdown(evaluate, target_model):
                print(f"FAILED: Could not select target model {target_model}. Aborting.")
                sys.exit(1)

            await asyncio.sleep(1)

            # Send the auto-resume message to the target engine
            print(f"SUCCESS: Switched to {target_model}. Sending auto-resume message...")
            await evaluate(js_send_message(
                "Auto-resuming workflow under new engine. Verify the switch by checking for USER_SETTINGS_CHANGE in your metadata, then continue with your tasks\\n"
            ))

        finally:
            # === UNPAUSE THE AUTO-CLICKER ===
            await evaluate("window.__switchHandoverActive = false")
            print("Auto-clicker resumed via __switchHandoverActive = false")

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print("Usage: python switch_to_other.py <TARGET_MODEL> [--trampoline] or python switch_to_other.py --check")
        sys.exit(1)
        
    if sys.argv[1] == '--check':
        port = find_active_cdp_port()
        if port:
            print(f"ACTIVE_CDP_PORT={port}")
            sys.exit(0)
        else:
            print("ACTIVE_CDP_PORT=None")
            sys.exit(1)
    
    target = sys.argv[1]
    trampoline = '--trampoline' in sys.argv
    
    port = find_active_cdp_port()
    if not port:
        print("FAILED: No active CDP debugging port found on localhost (ports 8997-9003 are offline).")
        sys.exit(1)
        
    asyncio.run(switch_model(target, port, use_trampoline=trampoline))

This is used in: