Commands Code Text Copy to Drive
Notebook

Gemini

AIアバター動画自動生成

商品紹介系の動画を自動作成するワークフローです。 上から順にセルを実行すると、企画案の生成から Talking Photo 動画の生成まで自動で行われます。

ご利用前の注意

  • HeyGen / ElevenLabs / Google Gemini の API キーが必要です。
  • 生成・削除を含め外部 API を直接呼び出すため、無料枠やレート制限にご注意ください。
  • 一層表現豊かな動画は HeyGenの AVATAR IV API を使用すると実現できますが、Freeプランでは利用できず、約月100ドル~のProプランに契約すると利用可能になります。

Gemini

# 必要ライブラリのインストール
!pip install -q requests httpx pydantic tqdm ipywidgets pillow
!pip -q install -U google-genai
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.6/1.6 MB 10.4 MB/s eta 0:00:00
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 45.4/45.4 kB 1.6 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 244.7/244.7 kB 6.0 MB/s eta 0:00:00

Gemini

import os
import json
import time
import base64
import mimetypes
import shutil
import uuid
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Optional, Any

import requests
from tqdm.auto import tqdm
import ipywidgets as widgets
from IPython.display import display, HTML, Image, Audio, Video, clear_output
from google.colab import files
from PIL import Image as PILImage
import google.generativeai as genai
from pydantic import BaseModel, Field, validator

RUN_ROOT = Path('/content/ai-avatar-ads-colab')
ASSETS_DIR = RUN_ROOT / 'assets'
OUTPUTS_DIR = RUN_ROOT / 'outputs'

for directory in [RUN_ROOT, ASSETS_DIR, OUTPUTS_DIR]:
    directory.mkdir(parents=True, exist_ok=True)

print('🔧 作業ディレクトリ: ', RUN_ROOT)
🔧 作業ディレクトリ:  /content/ai-avatar-ads-colab

Gemini
import sys, time, json, traceback, mimetypes
from contextlib import contextmanager
import ipywidgets as widgets
from IPython.display import display, Image as DImage, Audio as DAudio, FileLink, HTML

class ProgressLogger:
    def __init__(selftotal_stepsint):
        self.total_steps = total_steps
        self.current = 0
        self.stage_label = widgets.HTML("<b>待機中...</b>")
        self.bar = widgets.IntProgress(value=0, min=0, max=total_steps, description='進捗')
        self.log_out = widgets.Output(layout=widgets.Layout(border='1px solid #ddd', height='260px', overflow_y='auto'))
        self.container = widgets.VBox([widgets.HBox([self.bar, widgets.HTML("&nbsp;")]), self.stage_label, self.log_out])
        display(self.container)

    def info(selfmsgstr):
        with self.log_out:
            print(msg); sys.stdout.flush()

    def error(selfmsgstr):
        with self.log_out:
            print(f"❌ {msg}"); sys.stdout.flush()

    def success(selfmsgstr):
        with self.log_out:
            print(f"✅ {msg}"); sys.stdout.flush()

    def kv(selftitlestr, **kwargs):
        with self.log_out:
            print(f"--- {title} ---")
            for k, v in kwargs.items():
                print(f"{k}{v}")
            print("")
            sys.stdout.flush()

    def json(selftitlestrdatamax_charsint = 2000):
        with self.log_out:
            print(f"--- {title} ---")
            s = json.dumps(data, ensure_ascii=False, indent=2)
            if len(s) > max_chars:
                print(s[:max_chars] + "\n... (truncated)")
            else:
                print(s)
            print("")
            sys.stdout.flush()

    def image(selfpathstrtitlestr = ""):
        with self.log_out:
            if title: print(f"--- {title} ---")
            display(DImage(filename=path))
            print("")
            sys.stdout.flush()

    def audio(selfpathstrtitlestr = ""):
        with self.log_out:
            if title: print(f"--- {title} ---")
            display(DAudio(filename=path, autoplay=False))
            print("")
            sys.stdout.flush()

    def file(selfpathstrtitlestr = ""):
        with self.log_out:
            if title: print(f"--- {title} ---")
            display(FileLink(path))
            print("")
            sys.stdout.flush()

    @contextmanager
    def step(selftitlestr):
        self.current += 1
        self.stage_label.value = f"<b>{self.current}/{self.total_steps}{title}</b>"
        start = time.time()
        self.info(f"▶ {title} ...")
        try:
            yield
        except Exception as e:
            dur = time.time() - start
            tb = traceback.format_exc(limit=3)
            self.error(f"{title} で例外: {type(e).__name__}{e}{dur:.1f}s)"); self.error(tb.strip()); raise
        else:
            dur = time.time() - start
            self.success(f"{title} 完了({dur:.1f}s)")
            self.bar.value = self.current

Gemini
import os
import getpass

def prompt_api_key(namestr):
    if os.environ.get(name):
        print(f"✅ {name} は既に設定されています")
        return
    while True:
        value = getpass.getpass(f"{name}: ").strip()
        if value:
            os.environ[name] = value
            print(f"✅ {name} を設定しました")
            break
        print("⚠️ 空文字は設定できません。もう一度入力してください。")

print("🔑 必要な API キーを順番に入力してください (入力内容は表示されません)。")
for key in ["HEYGEN_API_KEY""ELEVENLABS_API_KEY""GOOGLE_API_KEY"]:
    prompt_api_key(key)

print("ℹ️ もう一度設定したい場合は、このセルを再実行してください。")

🔑 必要な API キーを順番に入力してください (入力内容は表示されません)。
HEYGEN_API_KEY: ··········
✅ HEYGEN_API_KEY を設定しました
ELEVENLABS_API_KEY: ··········
✅ ELEVENLABS_API_KEY を設定しました
GOOGLE_API_KEY: ··········
✅ GOOGLE_API_KEY を設定しました
ℹ️ もう一度設定したい場合は、このセルを再実行してください。

Gemini

# 設定値(必要に応じて変更してください)
CONFIG = {
    'proposal_count'3,
    'video_duration_sec'30,
    'video_aspect''portrait',
    'delete_resources'True,
    'enable_tts_optimization'True
}

DIMENSIONS = {
    'portrait': {'width'720'height'1280'label''720x1280'},
    'landscape': {'width'1280'height'720'label''1280x720'}
}

print('🛠️ 現在の設定:')
for key, value in CONFIG.items():
    print(f'  - {key}{value}')
🛠️ 現在の設定:
  - proposal_count: 3
  - video_duration_sec: 30
  - video_aspect: portrait
  - delete_resources: True
  - enable_tts_optimization: True

Gemini

    # データモデル定義
    class RunInfo(BaseModel):
        run_id: str
        created_at: str
        version: str = '1.0.0'

    class InputData(BaseModel):
        product_name: str
        product_image_path: str
        avatar_image_path: Optional[str] = None
        meta_text: str

    class PlanProposal(BaseModel):
        id: str
        summary: str
        value_props: List[str]
        target: str
        tone: Optional[str] = None
        visual_ideas: Optional[List[str]] = None

        @validator('value_props', pre=True)
        def ensure_list(clsvalue):
            if isinstance(value, list):
                return [str(v) for v in value]
            if isinstance(value, str):
                return [v.strip() for v in value.split(''if v.strip()]
            return []

    class Script(BaseModel):
        text: str
        duration_estimate: Optional[int] = None
        word_count: Optional[int] = None

    class VoiceSelection(BaseModel):
        voice_id: str
        reason: str
        model_id: str = 'eleven_multilingual_v2'
        meta: Optional[Dict[str, Any]] = None

    class VoiceData(VoiceSelection):
        audio_path: Optional[str] = None
        optimized_text: Optional[str] = None

    class VideoData(BaseModel):
        video_id: str
        url: str
        path: str
        platform: str

    class Manifest(BaseModel):
        run: RunInfo
        input: InputData
        plans: List[PlanProposal] = Field(default_factory=list)
        scripts: Dict[str, Script] = Field(default_factory=dict)
        avatar: Dict[str, Any] = Field(default_factory=dict)
        composed_images: Dict[str, Dict[strstr]] = Field(default_factory=dict)
        voices: Dict[str, VoiceData] = Field(default_factory=dict)
        videos: Dict[str, Dict[str, VideoData]] = Field(default_factory=dict)

        def to_json(selfpathPath):
            path.parent.mkdir(parents=True, exist_ok=True)
            with open(path, 'w', encoding='utf-8'as f:
                json.dump(self.dict(), f, ensure_ascii=False, indent=2)

    print('✅ データモデルを定義しました')
✅ データモデルを定義しました
/tmp/ipython-input-556695435.py:21: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator('value_props', pre=True)

Gemini
# === Gemini 連携 ===
from typing import Any, Dict, List, Optional
import json, mimetypes
from google import genai
from google.genai import types as genai_types

class GeminiProvider:
    def __init__(selfapi_keystr):
        self.client = genai.Client(api_key=api_key)
        self.text_preferred = ["gemini-2.5-flash""gemini-2.0-flash""gemini-2.5-flash-lite"]
        self.image_preferred = ["gemini-2.5-flash-image-preview"]

    def _resolve_model(selfpreferredList[str]) -> str:
        available = []
        for m in self.client.models.list():
            actions = getattr(m, "supported_actions"None)
            if actions and "generateContent" in actions:
                available.append(m.name.split("/"1)[-1])
        for base in preferred:
            for n in available:
                if n.startswith(base):
                    return n
        if available:
            return available[0]
        raise RuntimeError("generateContent 対応モデルが見つかりません。")

    def generate_json(
        self,
        promptstr,
        system_instructionOptional[str] = None,
        response_schemaOptional[dict] = None,
    ) -> Any:
        model_name = self._resolve_model(self.text_preferred)
        cfg = genai_types.GenerateContentConfig(
            response_mime_type="application/json",
        )
        if response_schema is not None:
            cfg.response_schema = response_schema
        if system_instruction:
            cfg.system_instruction = system_instruction

        res = self.client.models.generate_content(
            model=model_name,
            contents=prompt,
            config=cfg,
        )
        text = (res.text or "").strip()
        if not text:
            raise RuntimeError("Gemini から空の応答が返されました")

        try:
            return json.loads(text)
        except json.JSONDecodeError:
            if "```" in text:
                s = text.find("```") + 3
                e = text.find("```", s)
                return json.loads(text[s:e].strip())
            raise

    def generate_composed_image(selfpromptstrimage_pathsList[str]) -> bytes:
        model_name = self._resolve_model(self.image_preferred)
        parts: List[Any] = [prompt]
        for p in image_paths:
            mime = mimetypes.guess_type(p)[0or "image/png"
            with open(p, "rb"as f:
                img_bytes = f.read()
            parts.append(genai_types.Part.from_bytes(data=img_bytes, mime_type=mime))

        res = self.client.models.generate_content(model=model_name, contents=parts)
        for cand in getattr(res, "candidates", []) or []:
            for part in cand.content.parts:
                if getattr(part, "inline_data"Noneand part.inline_data.data:
                    return part.inline_data.data
        raise RuntimeError("Gemini が有効な画像データを返しませんでした")

Gemini

# ElevenLabs 連携

class ElevenLabsProvider:
    def __init__(selfapi_keystr):
        self.api_key = api_key
        self.base_url = 'https://api.elevenlabs.io/v1'

    def list_voices(self) -> List[Dict[str, Any]]:
        response = requests.get(
            f'{self.base_url}/voices',
            headers={'xi-api-key'self.api_key},
            timeout=30
        )
        response.raise_for_status()
        return response.json().get('voices', [])

    def synthesize(selftextstrvoice_idstrmodel_idstr = 'eleven_multilingual_v2') -> bytes:
        payload = {
            'text': text,
            'model_id': model_id,
            'voice_settings': {
                'stability'0.5,
                'similarity_boost'0.75,
                'style'0.2,
                'use_speaker_boost'True
            }
        }
        response = requests.post(
            f'{self.base_url}/text-to-speech/{voice_id}',
            headers={
                'xi-api-key'self.api_key,
                'Content-Type''application/json',
                'Accept''audio/mpeg'
            },
            json=payload,
            timeout=60
        )
        response.raise_for_status()
        return response.content

print('✅ ElevenLabs プロバイダを準備しました')
✅ ElevenLabs プロバイダを準備しました

Gemini
# HeyGen 連携

import requests, json, mimetypes
from typing import Optional, Dict

HEYGEN_API_BASE = "https://api.heygen.com"
HEYGEN_UPLOAD_BASE = "https://upload.heygen.com"

class HeyGenHTTPError(RuntimeError):
    pass

class HeyGenProvider:
    def __init__(selfapi_keystrloggerOptional[ProgressLogger] = None):
        self.api_key = api_key
        self.logger = logger

    def _request(selfmethodstrurlstr, *, headers=None, **kwargs) -> Dict:
        h = {"X-Api-Key"self.api_key}
        if headers: h.update(headers)
        resp = requests.request(method, url, headers=h, timeout=60, **kwargs)
        try:
            payload = resp.json()
        except Exception:
            payload = {"raw": resp.text}

        if self.logger:
            self.logger.info(f"[HTTP] {method} {url} -> {resp.status_code}")
            try:
                self.logger.info(f"[HTTP] body: {json.dumps(payload, ensure_ascii=False)[:1000]}")
            except Exception:
                self.logger.info(f"[HTTP] body(raw): {str(payload)[:1000]}")

        if not resp.ok:
            raise HeyGenHTTPError(f"{resp.status_code} {url} {payload}")

        if isinstance(payload, dict):
            code = payload.get("code")
            data = payload.get("data")
            if code not in (None100and not data:
                raise HeyGenHTTPError(f"API error: code={code}, payload={payload}")
            return payload
        raise HeyGenHTTPError(f"Unexpected payload type: {type(payload)}")

    def _guess_mime(selfpathstr) -> str:
        low = path.lower()
        if low.endswith(".mp3"): return "audio/mpeg"
        if low.endswith(".wav"): return "audio/wav"
        if low.endswith(".m4a"): return "audio/mp4"
        if low.endswith(".png"): return "image/png"
        if low.endswith(".jpg"or low.endswith(".jpeg"): return "image/jpeg"
        if low.endswith(".mp4"): return "video/mp4"
        if low.endswith(".webm"): return "video/webm"
        return mimetypes.guess_type(path)[0or "application/octet-stream"

    # 画像/音声/動画アップロード
    def upload_asset(selfpathstrmimeOptional[str] = None) -> Dict:
        ct = mime or self._guess_mime(path)
        if self.logger:
            self.logger.kv("upload_asset", path=path, content_type=ct)
        with open(path, "rb"as f:
            return self._request(
                "POST",
                f"{HEYGEN_UPLOAD_BASE}/v1/asset",
                headers={"Content-Type": ct},
                data=f
            )

    def create_photo_avatar_group(selfnamestrimage_keystr) -> Dict:
        body = {"name": name, "image_key": image_key}
        return self._request(
            "POST",
            f"{HEYGEN_API_BASE}/v2/photo_avatar/avatar_group/create",
            headers={"Content-Type""application/json"},
            json=body
        )

    def get_photo_avatar(selfavatar_idstr) -> Dict:
        return self._request("GET"f"{HEYGEN_API_BASE}/v2/photo_avatar/{avatar_id}")

    def request_video_generate(
        self, *, talking_photo_idstraudio_asset_idstr,
        widthintheightinttitlestr,
        callback_urlOptional[str] = Nonecallback_idOptional[str] = None
    ) -> str:
        body = {
            "title": title,
            "video_inputs": [{
                "character": {"type""talking_photo""talking_photo_id": talking_photo_id},
                "voice": {"type""audio""audio_asset_id": audio_asset_id},
                "background": {"type""color""value""#f6f6fc"}
            }],
            "dimension": {"width": width, "height": height}
        }
        if callback_url: body["callback_url"] = callback_url
        if callback_id:  body["callback_id"] = callback_id

        payload = self._request(
            "POST",
            f"{HEYGEN_API_BASE}/v2/video/generate",
            headers={"Content-Type""application/json"},
            json=body
        )
        data = payload.get("data"or {}
        vid = data.get("video_id")
        if not vid:
            raise HeyGenHTTPError(f"video_id が取得できません: {payload}")
        return vid

    def poll_video_status(
        self,
        video_idstr,
        *,
        timeout_secint = 600,
        interval_secint = 5,
        ensure_urlbool = True,
        post_complete_retryint = 5,
        post_complete_intervalint = 2,
    ) -> Dict:
        """
        status が completed になるまでポーリング。
        ensure_url=True のとき、completed 直後に video_url が None の場合は
        post_complete_retry 回まで 2 次取得(get_video_status)を試みる。
        """
        import time
        start = time.time()
        url = f"{HEYGEN_API_BASE}/v1/video_status.get?video_id={video_id}"
        while True:
            payload = self._request("GET", url)
            data = payload.get("data"or {}
            status = data.get("status")
            if self.logger:
                self.logger.info(f"[status] video_id={video_id} status={status}")
            if status == "completed":
                if ensure_url and not data.get("video_url"):
                    for _ in range(max(0int(post_complete_retry))):
                        time.sleep(max(1int(post_complete_interval)))
                        again = self.get_video_status(video_id)
                        again_data = again.get("data"or {}
                        if again_data.get("video_url"):
                            return again_data
                return data
            if status == "failed":
                raise HeyGenHTTPError(f"Video failed: {data}")
            if time.time() - start > timeout_sec:
                raise TimeoutError(f"Video polling timeout ({timeout_sec}s)")
            time.sleep(interval_sec)

    def get_video_status(selfvideo_idstr) -> Dict:
        """単発の最新ステータス取得(再照会用ユーティリティ)"""
        return self._request("GET"f"{HEYGEN_API_BASE}/v1/video_status.get?video_id={video_id}")

    def delete_photo_avatar(selfavatar_idstr):
        self._request("DELETE"f"{HEYGEN_API_BASE}/v2/photo_avatar/{avatar_id}")

    def delete_photo_avatar_group(selfgroup_idstr):
        self._request("DELETE"f"{HEYGEN_API_BASE}/v2/photo_avatar_group/{group_id}")

    def delete_asset(selfasset_idstr):
        self._request("POST"f"{HEYGEN_API_BASE}/v1/asset/{asset_id}/delete")

    def list_talking_photos(self) -> list[dict]:
        """
        /v2/avatars から talking_photos を取得
        https://api.heygen.com/v2/avatars
        """
        payload = self._request("GET"f"{HEYGEN_API_BASE}/v2/avatars")
        d = payload.get("data"if isinstance(payload, dictelse payload
        tps = (d.get("talking_photos"if isinstance(d, dictelse Noneor []
        if self.logger:
            self.logger.kv("talking_photos_count", count=len(tps))
        return tps

    def prune_talking_photos(self, *, prefixstr = "group_"max_totalint = 3keepint = 0) -> int:
        """
        Free枠を確保するため、prefixで識別できる一時Photo Avatarを古い順に削除。
        - max_total: アカウント全体で許容する総数(Freeは3)
        - keep: prefix対象を何体残すか(0なら全部削除対象)
        戻り値: 削除件数
        """
        tps = self.list_talking_photos()
        if len(tps) < max_total:
            return 0

        candidates = []
        for tp in tps:
            tp_id = tp.get("talking_photo_id"or tp.get("id")
            tp_name = tp.get("talking_photo_name"or tp.get("name"or ""
            if not tp_id or (prefix and not tp_name.startswith(prefix)):
                continue
            detail = self.get_photo_avatar(tp_id)
            data = detail.get("data"or {}
            created = data.get("created_at"or 0
            group_id = data.get("group_id")
            candidates.append({"id": tp_id, "name": tp_name, "group_id": group_id, "created": created})

        candidates.sort(key=lambda x: x["created"or 0)
        removed = 0
        total = len(tps)
        target_remove = max(0, total - max_total + keep)
        for row in candidates:
            if removed >= target_remove:
                break
            try:
                self.delete_photo_avatar(row["id"])
                removed += 1
                if self.logger:
                    self.logger.info(f"[cleanup] deleted talking_photo {row['id']} ({row['name']})")
            except Exception as e:
                if self.logger:
                    self.logger.info(f"[cleanup] delete_photo_avatar failed: {e}")
            if row.get("group_id"):
                try:
                    self.delete_photo_avatar_group(row["group_id"])
                except Exception:
                    pass
        return removed

    def ensure_talking_photo_quota(self, *, name_prefixstr = "group_"max_totalint = 3reserveint = 1):
        """
        新規作成の直前に実行。max_total=3 で reserve=1 なら、
        現在数が3以上なら prefix一致の古いものから削って「新規1体作れる状態」にする。
        """
        removed = self.prune_talking_photos(prefix=name_prefix, max_total=max_total, keep=0)
        if self.logger:
            self.logger.kv("quota_cleanup_removed", removed=removed)

print('✅ HeyGen プロバイダを準備しました')
✅ HeyGen プロバイダを準備しました

Gemini
# === ダウンロードヘルパ ===
import requests
from pathlib import Path

def download_to_file(urlstrdest_pathPathlogger=Nonechunk_sizeint = 1024*256) -> Path:
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    with requests.get(url, stream=True, timeout=180as r:
        r.raise_for_status()
        total = int(r.headers.get("Content-Length""0")) or None
        written = 0
        with open(dest_path, "wb"as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if not chunk:
                    continue
                f.write(chunk)
                written += len(chunk)
                if logger and total:
                    pct = int(written * 100 / total)
                    logger.info(f"[download] {written}/{total} bytes ({pct}%)")
    if logger:
        logger.file(str(dest_path), title="saved video")
    return dest_path

Gemini

# パイプライン補助関数
def ensure_api_keys():
    required = ['HEYGEN_API_KEY''ELEVENLABS_API_KEY''GOOGLE_API_KEY']
    missing = [key for key in required if not os.environ.get(key)]
    if missing:
        raise RuntimeError(f'未設定の API キーがあります: {", ".join(missing)}')

def copy_asset(src_pathstrdest_dirPathfilenamestr) -> Path:
    dest_dir.mkdir(parents=True, exist_ok=True)
    dest_path = dest_dir / filename
    shutil.copy2(src_path, dest_path)
    return dest_path

def optimize_text_for_tts(textstr) -> str:
    if not CONFIG['enable_tts_optimization']:
        return text
    cleaned = text.replace('、''、 ').replace('。''。 ').replace(''' ')
    cleaned = ' '.join(cleaned.split())
    return cleaned

print('✅ 補助関数を定義しました')
✅ 補助関数を定義しました

Gemini

Gemini
# === ElevenLabs 音声プリセット ===
import ipywidgets as widgets
from IPython.display import display

# Elevenlabs公式からIDを取得可能です
VOICE_PRESETS = [
    ("Morioki""8EkOjt4xTPGMclNlh1pk"), https://elevenlabs.io/app/voice-library?voiceId=8EkOjt4xTPGMclNlh1pk (※ ご自身でボイスの追加が必要です)
    ("Otani",     "3JDquces8E8bkmvbh6Bc"), # https://elevenlabs.io/app/voice-library?voiceId=3JDquces8E8bkmvbh6Bc
    ("Shizuka",     "WQz3clzUdMqvBf0jswZQ"), # https://elevenlabs.io/app/voice-library?voiceId=WQz3clzUdMqvBf0jswZQ
]

voice_dropdown = widgets.Dropdown(
    options=[(f"{name} — {vid[:6]}…", vid) for name, vid in VOICE_PRESETS],
    value=VOICE_PRESETS[0][1],
    description="ElevenLabs",
    layout=widgets.Layout(width='70%')
)

display(voice_dropdown)

def get_selected_voice_id() -> str:
    """実行セル側から現在の選択値を取得"""
    return voice_dropdown.value

print("✅ 音声の初期値をセットしました。必要ならプルダウンで切り替えてください。")


Gemini
from google.colab import files
from IPython.display import display, HTML
import ipywidgets as widgets

SELECTION = {}

display(HTML("<h4>① 商品画像をアップロード</h4>"))
up1 = files.upload()
if not up1:
    raise RuntimeError("商品画像がアップロードされていません")
name1, data1 = next(iter(up1.items()))
prod_path = ASSETS_DIR / name1
with open(prod_path, "wb"as f: f.write(data1)
SELECTION["product_image"] = str(prod_path)
display(HTML(f"✅ 商品画像: {name1}"))

display(HTML("<h4>② アバター画像をアップロード</h4>"))
up2 = files.upload()
if not up2:
    raise RuntimeError("アバター画像がアップロードされていません")
name2, data2 = next(iter(up2.items()))
avatar_path = ASSETS_DIR / name2
with open(avatar_path, "wb"as f: f.write(data2)
SELECTION["avatar_image"] = str(avatar_path)
display(HTML(f"✅ アバター画像: {name2}"))

Gemini
# === 実行セル ===
import ipywidgets as widgets
from IPython.display import display

product_name_widget = widgets.Text(
    value='Apple Watch Series 9', description='商品名',
    layout=widgets.Layout(width='70%')
)
meta_text_widget = widgets.Textarea(
    value='健康管理とライフスタイル向上を両立したスマートウォッチ。忙しい社会人向けに時短と健康サポートを訴求。',
    description='企画メモ', layout=widgets.Layout(width='80%', height='120px')
)
run_button = widgets.Button(description='パイプライン実行', button_style='success', icon='play')
run_status = widgets.HTML('')

display(widgets.HTML('<h3>実行パラメータ</h3>'))
display(product_name_widget)
display(meta_text_widget)
display(run_button)
display(run_status)

PIPELINE_MANIFEST = {}

def run_pipeline(_):
    try:
        ensure_api_keys()
        if not SELECTION.get('product_image'or not SELECTION.get('avatar_image'):
            run_status.value = '<p style="color:#d9534f;">商品画像とアバター画像の両方をアップロードしてください。</p>'
            return

        logger = ProgressLogger(total_steps=1)
        logger.info("パイプライン開始")

        selected_voice_id = get_selected_voice_id(if 'get_selected_voice_id' in globals() else None
        runner = PipelineRunner(logger=logger, voice_id=selected_voice_id)
        manifest = runner.run(
            product_name=product_name_widget.value.strip(),
            product_image_path=SELECTION['product_image'],
            avatar_image_path=SELECTION['avatar_image'],
            meta_text=meta_text_widget.value.strip()
        )
        PIPELINE_MANIFEST['data'] = manifest

        logger.success('パイプライン完了')
        run_status.value = '<p style="color:#3c763d;">🎉 パイプラインが完了しました。結果表示セルを実行してください。</p>'

    except Exception as e:
        try:
            logger.error(f"Unhandled error: {type(e).__name__}{e}")
        except Exception:
            pass
        run_status.value = f'<p style="color:#d9534f;">❌ エラー: {type(e).__name__}{e}</p>'

run_button.on_click(run_pipeline)


Gemini


Variables Terminal