Skip to content

Thanks for the solid work , have some sso cookies #1

@Mahkhmood8

Description

@Mahkhmood8

azureuser@taiwan_not_a_country:~$ head sso_cookies.txt -n 20
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMzQzMjYxOGYtYzZmNi00YzI1LTk5ZWUtMTE0YzBiZjU0YmQzIn0.9izGRII9Ssd9SLoDhnwfaFnsGQse7kpDW6tF9wuAEX4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNmY4MmVjZjQtOTI5Mi00NGJjLTljYmYtNGExODQ2YWExYTE0In0.ZdkWyvogOzAQn1VTlAdwXfR7o8HinwQEWTVk3TTZEVU
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNDFmNmMwZmItNjU4NC00ZDJkLTljN2ItYzQwNGUzZmE0MGU1In0.Jf8o8cpji94PQPnKzkskuYKVZdfcQzApMP3MN6AWOBU
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNGQ5YjkzMTItMzNjNC00Zjc3LWIwZDUtYjQzMjcyNzA1NDlmIn0.7ZF10vsAKEIflwDaADiaOcN5JOkDf6tc36qttzFDCcE
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiYWQ4MmNlOGUtNDA3MC00YTg1LWI2MjYtOWMwYzI3NDNiOGQwIn0.VEsUgNidDCU5bg-VfIhMtn57psDWFTm-_Php3QEf_a4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiYmU2Nzk1NzQtMzNmNy00NzEzLTllZWEtNGIyODdjZTY2MTU1In0.zn1zItYHDJcZQ4u1o5RYo2168F8_d4nnodSJ2LKGSP4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNDMzNDJmNTMtZTc0YS00ZGViLWIzZGEtYWE0NTZjNDVkMTdkIn0.L7MmnMPwHZpbCoSdPHL2XL-xMTOBFjysFLwUajqMcrQ
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiYWRhM2NjOTYtNTIyMC00MGE0LTg1MmUtOWVlNmViM2M4NTM5In0.M2mTACO5H46CCWRhTJtANy1C0kA_0eLmozXDknixoHg
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMDM0ZDFiMGMtMTk3Ny00NTRlLTk1MjItZTdkM2YwYmRhNzFlIn0.Kgmm1CEwuLH76Stqv5wKJ76llom2JdPk5Lt4xp7PC14
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMWY2OGFiMmYtMmI3MS00ODM3LWI1NmYtYWQ2MTNjNTBmMzRiIn0.da_gJKtMIMD-2kZdfjtMGQ5cZ61VzjKWbzo9D7277Gg
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiOTljMzdmNjgtMzhhNi00OTkyLThmOGItOWQ5MDc0YjE5YmY0In0.igDf3IZI6H_GJQRJUH6z2skVWfwMzTCdwF7tCk-1SCQ
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNjBmMjVjZTUtMGRiZS00NmJkLWJmNTEtYmJlMzhmOGY5ZTViIn0.uM45wVJXZeiQBcMCfKdSeEJNK9RwwLCWfomoz0osnq4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiZWM4YWM4NDAtZTQ4OC00YzExLWE1MWUtOTc1ZDEwNGQ4NTc1In0.hJf4-qWlA3dpwGjvjGIWHHp1XGPDQfXqjyDxRrDemd0
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMjBiOWZlYTMtNmU5Yy00ODdmLTg0YzktZmM2NjRlZGFlMTRmIn0.HHoafqux29189hRomyv9JS10eIabxQAL-4YyTO5uUXY
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMzkyY2FmMmEtZDRjYS00NmZmLWI0OTAtYmY3ZmJlZGZjYzllIn0.wHq06lcEPpxPywB7cybz81N5greeQRxk85QA8pkPCK0
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiZGY0ODgzN2MtZWQ1Ni00YzdkLWEyM2UtYTc4MDMzOWRlMzI4In0.ft-CMvqcXTruPKbOGpmLDWMk9Dlz1ONSQqrMP9qiVKg
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMjQ2MzZmMzktODI1Zi00NzZlLWFkMTctMWVhMzYwNzBhOGU4In0.nJXGoREcccnjxmXQNcpmcFvTCADrW2P1twn0g-TLk40
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiN2EwOWY4OGItZGU2MS00NDNjLWE2NTAtYzJkYWFkNjJjOGUzIn0.paMl8GbHuYoMfv0TC_8gUqINtvGllO4UJKMqqYbRmEk
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiM2I0MTRlYTktZDY3Ny00ZTE5LWI3YjMtN2Q3NDVhYTA3MzgzIn0.VSiZDWmcoAEqVP4rZg_T19HyfRyvoFm7z1kMqJW-c6M
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMjljMzI3YWQtN2Q3YS00ZTgyLWI2OTUtMDhiZTNjMjBlYzg4In0.v8O8KgQ-GnTA89_XZ8qZaSGhTv0tcD3-a0xMMEtTU6s

py
# stress test on grok api 

import json
import uuid
import asyncio
import base64
from typing import Optional, Tuple
import random

import numpy as np
import string
import requests
from curl_cffi import AsyncSession

#import config
#from config import GROK_ASSETS_URL, GROK_CONVO_URL, GROK_POST_URL, GROK_UPLOAD_URL


# Base URLs for grok.com API
GROK_UPLOAD_URL = "https://grok.com/rest/app-chat/upload-file"
GROK_POST_URL = "https://grok.com/rest/media/post/create"
GROK_CONVO_URL = "https://grok.com/rest/app-chat/conversations/new"
GROK_ASSETS_URL = "https://assets.grok.com"


# ============================================================================
# Configuration
# ============================================================================
MAX_WORKERS = 20  # Maximum concurrent tasks
TOTAL_TASKS = 2000  # Total number of tasks to execute


# ============================================================================
# Utility Functions
# ============================================================================

async def get_statsig_id(proxies: Optional[dict] = None) -> str:
    """Fetch a random x_statsig_id from API (async)."""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(
        None, 
        lambda: requests.get("https://rui.soundai.ee/x.php", proxies=proxies, timeout=5).json()["x_statsig_id"]
    )

def random_string(length: int = 8) -> str:
    """Generate a random alphanumeric string of given length."""
    return ''.join(random.choices(string.ascii_letters + string.digits, k=length))


# ============================================================================
# Grok API Functions
# ============================================================================

async def generate_async(
    convo_url: str,
    cookies: dict,
    proxies: Optional[str] = None,
) -> dict:
    """
    Send async request to Grok API and stream response.
    
    Args:
        convo_url: Grok conversation endpoint URL
        cookies: Authentication cookies
        proxies: Proxy configuration dict
        
    Returns:
        Last response chunk as dict
    """
    statsig_id = await get_statsig_id(proxies)
    
    headers = {
        "x-statsig-id": statsig_id,
        "x-xai-request-id": str(uuid.uuid4()),
    }
    
    payload = {
        'temporary': False,
        'modelName': 'grok-4',
        'message': f'''You are an expert Python programmer. Your task is to first decode and then complete the Python program and pass all tests. The content within the angle brackets (<>) represents a number in a specific base. The content within the parentheses () immediately following indicates the value of that base. This correspond to an ASCII encoding of a character.
    
### Format: You will use the following starter code to write the solution to the problem and enclose your code within delimiters. Don't forget to specify it's PythonSix points $A, think harder {random_string()}, C, D, E$ and $F$ lie in a straight line in that {random_string()}. Suppose that $G$ is a point not on the line and that $AC = {random_string()}, $BD = {random_string()}/22$, $CE = 31$, $DF = 33$, $AF = 73$, $CG = 40$, and $DG = 30$. Find the area of $\\\\triangle BGE$''',
        'fileAttachments': [],
        'imageAttachments': [],
        'disableSearch': False,
        'enableImageGeneration': True,
        'returnImageBytes': False,
        'returnRawGrokInXaiRequest': False,
        'enableImageStreaming': True,
        'imageGenerationCount': 2,
        'forceConcise': False,
        'toolOverrides': {},
        'enableSideBySide': True,
        'sendFinalMetadata': True,
        'isReasoning': False,
        'webpageUrls': [],
        'disableTextFollowUps': False,
        'responseMetadata': {
            'modelConfigOverride': {
                'modelMap': {},
            },
            'requestModelDetails': {
                'modelId': 'grok-4',
            },
        },
        'disableMemory': False,
        'forceSideBySide': False,
        'modelMode': 'MODEL_MODE_EXPERT',# <--- lol
        'isAsyncChat': True,
        'disableSelfHarmShortCircuit': False,
    }
    
    last_response = {}
    
    async with AsyncSession() as session:
        resp = await session.post(
            GROK_CONVO_URL,
            data=json.dumps(payload),
            impersonate="chrome136",
            stream=True,
            cookies=cookies,
            proxies=proxies,
            headers=headers,
        )

        buffer = ""
        
        async for chunk in resp.aiter_content():
            buffer += chunk.decode("utf-8")

            while buffer:
                buffer = buffer.lstrip()
                if not buffer:
                    break
                
                try:
                    obj, idx = json.JSONDecoder().raw_decode(buffer)
                    response = obj.get("result", {}).get("response", {})
                    print(response)
                    last_response = response
                    
                    if "responseId" in response:
                        break
                        
                    buffer = buffer[idx:]
                except json.JSONDecodeError:
                    break
    
    return last_response


async def upload_file(
    proxies: Optional[str],
    payload: dict,
    cookies: dict
) -> Tuple[str, str]:
    """
    Upload file to grok.com and create media post.
    
    Args:
        proxies: Proxy configuration
        payload: File upload payload with fileName, fileMimeType, content
        cookies: Authentication cookies
        
    Returns:
        Tuple of (media_url, post_id)
    """
    async with AsyncSession() as session:
        # Step 1: Upload file
        resp = await session.post(
            GROK_UPLOAD_URL,
            cookies=cookies,
            json=payload,
            proxies=proxies,
            impersonate="chrome133a",
        )
        print("Upload Status:", resp.status_code)
        print("Upload Response:", resp.text)

        resp.raise_for_status()
        file_uri = resp.json().get("fileUri")
        if not file_uri:
            raise ValueError("File URI not found in response")

        # Step 2: Create media post
        json_data = {
            "mediaType": "MEDIA_POST_TYPE_IMAGE",
            "mediaUrl": f"{GROK_ASSETS_URL}/{file_uri}",
        }
        resp = await session.post(
            GROK_POST_URL,
            cookies=cookies,
            json=json_data,
            proxies=proxies,
            impersonate="chrome133a",
        )
        resp.raise_for_status()
        post = resp.json().get("post", {})
        return post.get("mediaUrl"), post.get("id")


# ============================================================================
# Fast Pool Management (Lock-free for reads)
# ============================================================================

class FastPool:
    """Lock-free pool for fast concurrent access."""
    
    def __init__(self, items: list):
        self.items = items.copy()
        self.lock = asyncio.Lock()
        self.semaphore = asyncio.Semaphore(len(items))
    
    async def acquire(self):
        """Get an item from the pool (fast, minimal locking)."""
        await self.semaphore.acquire()
        async with self.lock:
            if not self.items:
                raise ValueError("Pool is empty")
            # Pop from end is O(1) vs random index O(n)
            return self.items.pop()
    
    async def release(self, item):
        """Return an item to the pool."""
        async with self.lock:
            self.items.append(item)
        self.semaphore.release()
    
    def size(self):
        """Get current pool size (not thread-safe but useful for debugging)."""
        return len(self.items)


# ============================================================================
# SSO Cookie Pool
# ============================================================================

sso_pool = FastPool(config.SSO_COOKIES.copy())


async def get_sso_cookie() -> dict:
    """Acquire an SSO cookie from the pool."""
    cookie = await sso_pool.acquire()
    # Ensure it's returned as a dict format that curl_cffi expects
    if isinstance(cookie, str):
        return {"sso": cookie}
    return cookie


async def release_sso_cookie(cookie: dict) -> None:
    """Return an SSO cookie to the pool."""
    # Extract the actual cookie value if it's wrapped in a dict
    actual_cookie = cookie.get('sso') if isinstance(cookie, dict) and 'sso' in cookie else cookie
    await sso_pool.release(actual_cookie)


# ============================================================================
# Proxy Pool
# ============================================================================

def build_proxy_pool(ip_list: list, port_range: Tuple[int, int], user: str, password: str) -> list:
    """Build a pool of proxy URLs from IP list and port range."""
    pool = []
    for ip in ip_list:
        for port in range(port_range[0], port_range[1] + 1):
            proxy_url = f"http://{user}:{password}@{ip}:{port}"
            pool.append(proxy_url)
    return pool


# Initialize proxy pool
proxy_list = build_proxy_pool(
    ip_list=["89.117.94.75"],
    port_range=(30001, 30099),
    user="z",
    password="GET YOUR OWN PROXIES " I use https://www.ethernetservers.com/ 3dollas plan and 
 ./ipv6-proxy-server.sh -s 64 -c 400 -u z -p zero3 -t http -r 5
(free /64 ipv6 great for account creation too )
)

proxy_pool = FastPool(proxy_list)


async def get_proxy() -> str:
    """Acquire a proxy from the pool."""
    return await proxy_pool.acquire()


async def release_proxy(proxy: str) -> None:
    """Return a proxy to the pool."""
    await proxy_pool.release(proxy)


# ============================================================================
# Worker & Main Execution
# ============================================================================

async def worker(semaphore: asyncio.Semaphore, task_id: int) -> Optional[dict]:
    """
    Execute a single Grok API task with proxy and cookie management.
    
    Args:
        semaphore: Concurrency control semaphore
        task_id: Unique identifier for this task
        
    Returns:
        Result dict or None if task fails
    """
    # Wait for a worker slot to become available FIRST
    async with semaphore:
        # Now acquire resources (only MAX_WORKERS will do this at once)
        proxy_url = await get_proxy()
        cookies = await get_sso_cookie()
        proxies = {"http": proxy_url, "https": proxy_url}
        
        print(f"[Task {task_id}] Started with proxy: {proxy_url}")
        
        try:
            result = await asyncio.wait_for(
                generate_async(
                    proxies=proxies,
                    convo_url="https://grok.com/rest/app-chat/conversations/new",
                    cookies=cookies,
                ),
                timeout=200
            )
            print(f"[Task {task_id}] โœ“ Finished successfully")
            return result
        except asyncio.TimeoutError:
            print(f"[Task {task_id}] โœ— Timed out")
            return None
        except Exception as e:
            print(f"[Task {task_id}] โœ— Failed: {e}")
            return None
        finally:
            # Always release resources
            await release_proxy(proxy_url)
            await release_sso_cookie(cookies)


async def main():
    """
    Main execution function - launches tasks with concurrency control.
    """
    print(f"๐Ÿš€ Launching {TOTAL_TASKS} tasks with max_workers={MAX_WORKERS}...")
    print(f"๐Ÿ“Š Available proxies: {proxy_pool.size()}")
    print(f"๐Ÿ“Š Available cookies: {sso_pool.size()}")

    # Create semaphore for concurrency control
    semaphore = asyncio.Semaphore(MAX_WORKERS)

    # Create and launch ALL tasks at once
    tasks = [
        asyncio.create_task(worker(semaphore, i + 1))
        for i in range(TOTAL_TASKS)
    ]

    # Wait for completion
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    successful = sum(1 for r in results if r is not None and not isinstance(r, Exception))
    print(f"\nโœ… Completed: {successful}/{TOTAL_TASKS} tasks successful")
    
    return results


if __name__ == "__main__":
    asyncio.run(main())```

Activity

Mahkhmood8

Mahkhmood8 commented on Oct 26, 2025

@Mahkhmood8
Author

in a private manner I can give you access to all of my sso_cookies
which is 20k -- I only ran the sso generator for like 6 hours lol

Image
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @Mahkhmood8

        Issue actions

          Thanks for the solid work , have some sso cookies ยท Issue #1 ยท AdamBankz/grok-api-wrapper