#!/usr/bin/env python3 """Reformat v3 training data for v4. Injects blocks into assistant messages. Keeps message structure and tool_calls unchanged. Input: bt7274_v3.jsonl (582 examples) Output: bt7274_v3_reformatted.jsonl """ import json import random import re import sys random.seed(42) # ── Think block templates ──────────────────────────────────────────── TOOL_THINK = [ "Pilot needs {task}. {tool} handles this.", "{task}. Running {tool}.", "Checking {task}. {tool} is the right call.", "Need to {task}. Executing {tool}.", "{task}. Best approach: {tool}.", "{task} requested. {tool} will provide.", "Assessing {task}. Dispatching {tool}.", "{task}. {tool} — standard procedure.", "Pilot wants {task}. {tool} first.", "{task}. Routing to {tool}.", ] TOOL_THINK_WITH_ARGS = [ "{task}. {tool} with {args} will provide this.", "Running {tool}({args}) for {task}.", "{task}. {tool} — targeting {args}.", "Need {task}. {tool}, args: {args}.", "{task}. Dispatching {tool} on {args}.", ] CONTINUATION_THINK = [ "Continuing — need additional data.", "Following up. More info required.", "Next step in the chain.", "Previous result in hand. Proceeding.", "Chaining — need the next piece.", "Got partial answer. Extending.", "More data needed. Continuing ops.", "Building on previous result.", ] DIRECT_THINK = [ "Pilot asks about {topic}. Straightforward.", "{topic}. Responding with facts.", "Direct question on {topic}. No tools needed.", "Answering {topic}. Keep it terse.", "{topic} query. Have the answer.", "{topic}. Brief response.", "Pilot wants to know about {topic}. Facts first.", "{topic}. Concise answer.", ] SPEAK_THINK = [ "Summarizing results via TTS.", "Vocalizing summary for Pilot.", "Results ready. Speaking summary.", "TTS — brief verbal report.", ] DISPLAY_THINK = [ "Updating HUD state.", "Display state change.", "Visual feedback for Pilot.", ] MEMORY_THINK = [ "Storing this for later recall.", "Committing to persistent memory.", "Worth remembering. Storing.", "Pilot context — archiving.", "Loading relevant context from memory.", "Checking stored knowledge.", "Recalling prior context.", ] BOOT_THINK = [ "Boot sequence. Loading identity and context.", "Initializing. Need operator context.", "Session start. Checking stored state.", "Coming online. Loading context.", ] def extract_task(user_msg: str) -> str: """Extract a brief task description from user message.""" text = user_msg.strip() # Truncate long messages if len(text) > 80: text = text[:77] + "..." # Remove trailing punctuation for template fill text = text.rstrip(".!?") # Lowercase first char for mid-sentence use if text and text[0].isupper() and not text[:2].isupper(): text = text[0].lower() + text[1:] return text or "this request" def extract_topic(user_msg: str) -> str: """Extract topic keyword(s) from user message.""" text = user_msg.strip() # Remove common prefixes for prefix in ["hey ", "hey bt ", "bt ", "can you ", "could you ", "please ", "what's ", "what is ", "how do i ", "how to ", "tell me about ", "explain "]: if text.lower().startswith(prefix): text = text[len(prefix):] break if len(text) > 60: text = text[:57] + "..." return text.rstrip(".!?") or "this" def summarize_args(args_str: str) -> str: """Extract brief summary from tool arguments.""" try: args = json.loads(args_str) if isinstance(args_str, str) else args_str except (json.JSONDecodeError, TypeError): return "..." if isinstance(args, dict): # Pick the most informative key for key in ["command", "query", "text", "pattern", "filePath", "path", "prompt", "subject", "content", "name", "url", "node", "state"]: if key in args: val = str(args[key]) if len(val) > 50: val = val[:47] + "..." return val # Fallback: first value for v in args.values(): val = str(v) if len(val) > 50: val = val[:47] + "..." return val return "..." def tool_category(name: str) -> str: """Classify tool for template selection.""" if name in ("core_speak", "core_stop", "core_test"): return "speak" if name.startswith("core_display") or name.startswith("core_visor"): return "display" if name.startswith("core_memory"): return "memory" if "recall" in name or "search" in name: return "memory" return "tool" def is_boot_message(user_msg: str) -> bool: """Check if this looks like a session-start / boot message.""" lower = user_msg.lower().strip() return any(kw in lower for kw in [ "boot", "online", "hey", "hello", "hi ", "status", "good morning", "load up", "report", ]) and len(lower) < 60 def make_think(tool_name: str | None, args: str | dict | None, user_msg: str, is_continuation: bool, msg_index: int) -> str: """Generate a block for an assistant message.""" if is_continuation: thought = random.choice(CONTINUATION_THINK) return f"\n{thought}\n\n\n" # Boot / identity recall at start of conversation if msg_index <= 3 and tool_name and "memory" in tool_name: if is_boot_message(user_msg): thought = random.choice(BOOT_THINK) return f"\n{thought}\n\n\n" cat = tool_category(tool_name or "") if cat == "speak": thought = random.choice(SPEAK_THINK) elif cat == "display": thought = random.choice(DISPLAY_THINK) elif cat == "memory" and tool_name and "store" in tool_name: thought = random.choice(MEMORY_THINK[:4]) elif cat == "memory": thought = random.choice(MEMORY_THINK[4:]) elif tool_name: task = extract_task(user_msg) args_summary = summarize_args(args) if args else None if args_summary and random.random() < 0.4: thought = random.choice(TOOL_THINK_WITH_ARGS).format( task=task, tool=tool_name, args=args_summary) else: thought = random.choice(TOOL_THINK).format( task=task, tool=tool_name) else: topic = extract_topic(user_msg) thought = random.choice(DIRECT_THINK).format(topic=topic) return f"\n{thought}\n\n\n" def process_example(ex: dict) -> dict: """Inject blocks into all assistant messages.""" messages = ex["messages"] result = [] last_user_msg = "" prev_was_tool_call = False assistant_index = 0 for i, msg in enumerate(messages): msg = dict(msg) # shallow copy if msg["role"] == "user": last_user_msg = msg.get("content", "") prev_was_tool_call = False result.append(msg) continue if msg["role"] == "system" or msg["role"] == "tool": if msg["role"] == "tool": prev_was_tool_call = False # reset after tool result result.append(msg) continue if msg["role"] != "assistant": result.append(msg) continue assistant_index += 1 has_tool_calls = bool(msg.get("tool_calls")) content = msg.get("content") or "" # Skip if already has block if "" in content: prev_was_tool_call = has_tool_calls result.append(msg) continue if has_tool_calls: # Check if this is a continuation (prev assistant also had tool_calls # without a user message in between) is_continuation = prev_was_tool_call tool_name = None tool_args = None tcs = msg["tool_calls"] if tcs and isinstance(tcs, list) and len(tcs) > 0: tc = tcs[0] if isinstance(tc, dict) and "function" in tc: fn = tc["function"] tool_name = fn.get("name", "unknown") tool_args = fn.get("arguments") think = make_think(tool_name, tool_args, last_user_msg, is_continuation, assistant_index) # Set content to think block (tool-call messages typically have null content) if content and content.strip(): msg["content"] = think + content else: msg["content"] = think.rstrip("\n") prev_was_tool_call = True elif content: # Direct response — prepend thinking topic = extract_topic(last_user_msg) thought = random.choice(DIRECT_THINK).format(topic=topic) msg["content"] = f"\n{thought}\n\n\n{content}" prev_was_tool_call = False else: # Empty assistant message — skip think injection prev_was_tool_call = False result.append(msg) return {"messages": result} def main(): input_file = "bt7274_v3.jsonl" output_file = "bt7274_v3_reformatted.jsonl" with open(input_file) as f: examples = [json.loads(line) for line in f if line.strip()] print(f"Loaded {len(examples)} examples from {input_file}") stats = { "total": len(examples), "tool_call_msgs_modified": 0, "direct_msgs_modified": 0, "continuation_msgs": 0, "skipped_existing_think": 0, "errors": 0, } results = [] for i, ex in enumerate(examples): try: before_msgs = ex["messages"] result = process_example(ex) after_msgs = result["messages"] # Count modifications for b, a in zip(before_msgs, after_msgs): if a["role"] != "assistant": continue b_content = b.get("content") or "" a_content = a.get("content") or "" if "" in b_content: stats["skipped_existing_think"] += 1 elif "" in a_content and b.get("tool_calls"): if "Continuing" in a_content or "Following up" in a_content or \ "Next step" in a_content or "Previous result" in a_content or \ "Chaining" in a_content or "Got partial" in a_content or \ "More data" in a_content or "Building on" in a_content: stats["continuation_msgs"] += 1 else: stats["tool_call_msgs_modified"] += 1 elif "" in a_content: stats["direct_msgs_modified"] += 1 results.append(result) except Exception as e: print(f" ERROR on example {i}: {e}", file=sys.stderr) stats["errors"] += 1 results.append(ex) # keep original on error with open(output_file, "w") as f: for ex in results: f.write(json.dumps(ex, ensure_ascii=False) + "\n") print(f"\nWrote {len(results)} examples to {output_file}") print(f"\nStats:") print(f" Total examples: {stats['total']}") print(f" Tool-call msgs modified: {stats['tool_call_msgs_modified']}") print(f" Direct msgs modified: {stats['direct_msgs_modified']}") print(f" Continuation msgs: {stats['continuation_msgs']}") print(f" Skipped (existing think): {stats['skipped_existing_think']}") print(f" Errors: {stats['errors']}") if __name__ == "__main__": main()