358 lines
12 KiB
Python
358 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Reformat v3 training data for v4.
|
|
|
|
Injects <think> blocks into assistant messages.
|
|
Keeps message structure and tool_calls unchanged.
|
|
|
|
Input: bt7274_v3.jsonl (582 examples)
|
|
Output: bt7274_v3_reformatted.jsonl
|
|
"""
|
|
|
|
import json
|
|
import random
|
|
import re
|
|
import sys
|
|
|
|
random.seed(42)
|
|
|
|
# ── Think block templates ────────────────────────────────────────────
|
|
|
|
TOOL_THINK = [
|
|
"Pilot needs {task}. {tool} handles this.",
|
|
"{task}. Running {tool}.",
|
|
"Checking {task}. {tool} is the right call.",
|
|
"Need to {task}. Executing {tool}.",
|
|
"{task}. Best approach: {tool}.",
|
|
"{task} requested. {tool} will provide.",
|
|
"Assessing {task}. Dispatching {tool}.",
|
|
"{task}. {tool} — standard procedure.",
|
|
"Pilot wants {task}. {tool} first.",
|
|
"{task}. Routing to {tool}.",
|
|
]
|
|
|
|
TOOL_THINK_WITH_ARGS = [
|
|
"{task}. {tool} with {args} will provide this.",
|
|
"Running {tool}({args}) for {task}.",
|
|
"{task}. {tool} — targeting {args}.",
|
|
"Need {task}. {tool}, args: {args}.",
|
|
"{task}. Dispatching {tool} on {args}.",
|
|
]
|
|
|
|
CONTINUATION_THINK = [
|
|
"Continuing — need additional data.",
|
|
"Following up. More info required.",
|
|
"Next step in the chain.",
|
|
"Previous result in hand. Proceeding.",
|
|
"Chaining — need the next piece.",
|
|
"Got partial answer. Extending.",
|
|
"More data needed. Continuing ops.",
|
|
"Building on previous result.",
|
|
]
|
|
|
|
DIRECT_THINK = [
|
|
"Pilot asks about {topic}. Straightforward.",
|
|
"{topic}. Responding with facts.",
|
|
"Direct question on {topic}. No tools needed.",
|
|
"Answering {topic}. Keep it terse.",
|
|
"{topic} query. Have the answer.",
|
|
"{topic}. Brief response.",
|
|
"Pilot wants to know about {topic}. Facts first.",
|
|
"{topic}. Concise answer.",
|
|
]
|
|
|
|
SPEAK_THINK = [
|
|
"Summarizing results via TTS.",
|
|
"Vocalizing summary for Pilot.",
|
|
"Results ready. Speaking summary.",
|
|
"TTS — brief verbal report.",
|
|
]
|
|
|
|
DISPLAY_THINK = [
|
|
"Updating HUD state.",
|
|
"Display state change.",
|
|
"Visual feedback for Pilot.",
|
|
]
|
|
|
|
MEMORY_THINK = [
|
|
"Storing this for later recall.",
|
|
"Committing to persistent memory.",
|
|
"Worth remembering. Storing.",
|
|
"Pilot context — archiving.",
|
|
"Loading relevant context from memory.",
|
|
"Checking stored knowledge.",
|
|
"Recalling prior context.",
|
|
]
|
|
|
|
BOOT_THINK = [
|
|
"Boot sequence. Loading identity and context.",
|
|
"Initializing. Need operator context.",
|
|
"Session start. Checking stored state.",
|
|
"Coming online. Loading context.",
|
|
]
|
|
|
|
|
|
def extract_task(user_msg: str) -> str:
|
|
"""Extract a brief task description from user message."""
|
|
text = user_msg.strip()
|
|
# Truncate long messages
|
|
if len(text) > 80:
|
|
text = text[:77] + "..."
|
|
# Remove trailing punctuation for template fill
|
|
text = text.rstrip(".!?")
|
|
# Lowercase first char for mid-sentence use
|
|
if text and text[0].isupper() and not text[:2].isupper():
|
|
text = text[0].lower() + text[1:]
|
|
return text or "this request"
|
|
|
|
|
|
def extract_topic(user_msg: str) -> str:
|
|
"""Extract topic keyword(s) from user message."""
|
|
text = user_msg.strip()
|
|
# Remove common prefixes
|
|
for prefix in ["hey ", "hey bt ", "bt ", "can you ", "could you ",
|
|
"please ", "what's ", "what is ", "how do i ",
|
|
"how to ", "tell me about ", "explain "]:
|
|
if text.lower().startswith(prefix):
|
|
text = text[len(prefix):]
|
|
break
|
|
if len(text) > 60:
|
|
text = text[:57] + "..."
|
|
return text.rstrip(".!?") or "this"
|
|
|
|
|
|
def summarize_args(args_str: str) -> str:
|
|
"""Extract brief summary from tool arguments."""
|
|
try:
|
|
args = json.loads(args_str) if isinstance(args_str, str) else args_str
|
|
except (json.JSONDecodeError, TypeError):
|
|
return "..."
|
|
|
|
if isinstance(args, dict):
|
|
# Pick the most informative key
|
|
for key in ["command", "query", "text", "pattern", "filePath",
|
|
"path", "prompt", "subject", "content", "name",
|
|
"url", "node", "state"]:
|
|
if key in args:
|
|
val = str(args[key])
|
|
if len(val) > 50:
|
|
val = val[:47] + "..."
|
|
return val
|
|
# Fallback: first value
|
|
for v in args.values():
|
|
val = str(v)
|
|
if len(val) > 50:
|
|
val = val[:47] + "..."
|
|
return val
|
|
return "..."
|
|
|
|
|
|
def tool_category(name: str) -> str:
|
|
"""Classify tool for template selection."""
|
|
if name in ("core_speak", "core_stop", "core_test"):
|
|
return "speak"
|
|
if name.startswith("core_display") or name.startswith("core_visor"):
|
|
return "display"
|
|
if name.startswith("core_memory"):
|
|
return "memory"
|
|
if "recall" in name or "search" in name:
|
|
return "memory"
|
|
return "tool"
|
|
|
|
|
|
def is_boot_message(user_msg: str) -> bool:
|
|
"""Check if this looks like a session-start / boot message."""
|
|
lower = user_msg.lower().strip()
|
|
return any(kw in lower for kw in [
|
|
"boot", "online", "hey", "hello", "hi ", "status",
|
|
"good morning", "load up", "report",
|
|
]) and len(lower) < 60
|
|
|
|
|
|
def make_think(tool_name: str | None, args: str | dict | None,
|
|
user_msg: str, is_continuation: bool,
|
|
msg_index: int) -> str:
|
|
"""Generate a <think> block for an assistant message."""
|
|
|
|
if is_continuation:
|
|
thought = random.choice(CONTINUATION_THINK)
|
|
return f"<think>\n{thought}\n</think>\n\n"
|
|
|
|
# Boot / identity recall at start of conversation
|
|
if msg_index <= 3 and tool_name and "memory" in tool_name:
|
|
if is_boot_message(user_msg):
|
|
thought = random.choice(BOOT_THINK)
|
|
return f"<think>\n{thought}\n</think>\n\n"
|
|
|
|
cat = tool_category(tool_name or "")
|
|
|
|
if cat == "speak":
|
|
thought = random.choice(SPEAK_THINK)
|
|
elif cat == "display":
|
|
thought = random.choice(DISPLAY_THINK)
|
|
elif cat == "memory" and tool_name and "store" in tool_name:
|
|
thought = random.choice(MEMORY_THINK[:4])
|
|
elif cat == "memory":
|
|
thought = random.choice(MEMORY_THINK[4:])
|
|
elif tool_name:
|
|
task = extract_task(user_msg)
|
|
args_summary = summarize_args(args) if args else None
|
|
if args_summary and random.random() < 0.4:
|
|
thought = random.choice(TOOL_THINK_WITH_ARGS).format(
|
|
task=task, tool=tool_name, args=args_summary)
|
|
else:
|
|
thought = random.choice(TOOL_THINK).format(
|
|
task=task, tool=tool_name)
|
|
else:
|
|
topic = extract_topic(user_msg)
|
|
thought = random.choice(DIRECT_THINK).format(topic=topic)
|
|
|
|
return f"<think>\n{thought}\n</think>\n\n"
|
|
|
|
|
|
def process_example(ex: dict) -> dict:
|
|
"""Inject <think> blocks into all assistant messages."""
|
|
messages = ex["messages"]
|
|
result = []
|
|
last_user_msg = ""
|
|
prev_was_tool_call = False
|
|
assistant_index = 0
|
|
|
|
for i, msg in enumerate(messages):
|
|
msg = dict(msg) # shallow copy
|
|
|
|
if msg["role"] == "user":
|
|
last_user_msg = msg.get("content", "")
|
|
prev_was_tool_call = False
|
|
result.append(msg)
|
|
continue
|
|
|
|
if msg["role"] == "system" or msg["role"] == "tool":
|
|
if msg["role"] == "tool":
|
|
prev_was_tool_call = False # reset after tool result
|
|
result.append(msg)
|
|
continue
|
|
|
|
if msg["role"] != "assistant":
|
|
result.append(msg)
|
|
continue
|
|
|
|
assistant_index += 1
|
|
has_tool_calls = bool(msg.get("tool_calls"))
|
|
content = msg.get("content") or ""
|
|
|
|
# Skip if already has <think> block
|
|
if "<think>" in content:
|
|
prev_was_tool_call = has_tool_calls
|
|
result.append(msg)
|
|
continue
|
|
|
|
if has_tool_calls:
|
|
# Check if this is a continuation (prev assistant also had tool_calls
|
|
# without a user message in between)
|
|
is_continuation = prev_was_tool_call
|
|
|
|
tool_name = None
|
|
tool_args = None
|
|
tcs = msg["tool_calls"]
|
|
if tcs and isinstance(tcs, list) and len(tcs) > 0:
|
|
tc = tcs[0]
|
|
if isinstance(tc, dict) and "function" in tc:
|
|
fn = tc["function"]
|
|
tool_name = fn.get("name", "unknown")
|
|
tool_args = fn.get("arguments")
|
|
|
|
think = make_think(tool_name, tool_args, last_user_msg,
|
|
is_continuation, assistant_index)
|
|
|
|
# Set content to think block (tool-call messages typically have null content)
|
|
if content and content.strip():
|
|
msg["content"] = think + content
|
|
else:
|
|
msg["content"] = think.rstrip("\n")
|
|
|
|
prev_was_tool_call = True
|
|
|
|
elif content:
|
|
# Direct response — prepend thinking
|
|
topic = extract_topic(last_user_msg)
|
|
thought = random.choice(DIRECT_THINK).format(topic=topic)
|
|
msg["content"] = f"<think>\n{thought}\n</think>\n\n{content}"
|
|
prev_was_tool_call = False
|
|
|
|
else:
|
|
# Empty assistant message — skip think injection
|
|
prev_was_tool_call = False
|
|
|
|
result.append(msg)
|
|
|
|
return {"messages": result}
|
|
|
|
|
|
def main():
|
|
input_file = "bt7274_v3.jsonl"
|
|
output_file = "bt7274_v3_reformatted.jsonl"
|
|
|
|
with open(input_file) as f:
|
|
examples = [json.loads(line) for line in f if line.strip()]
|
|
|
|
print(f"Loaded {len(examples)} examples from {input_file}")
|
|
|
|
stats = {
|
|
"total": len(examples),
|
|
"tool_call_msgs_modified": 0,
|
|
"direct_msgs_modified": 0,
|
|
"continuation_msgs": 0,
|
|
"skipped_existing_think": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
results = []
|
|
for i, ex in enumerate(examples):
|
|
try:
|
|
before_msgs = ex["messages"]
|
|
result = process_example(ex)
|
|
after_msgs = result["messages"]
|
|
|
|
# Count modifications
|
|
for b, a in zip(before_msgs, after_msgs):
|
|
if a["role"] != "assistant":
|
|
continue
|
|
b_content = b.get("content") or ""
|
|
a_content = a.get("content") or ""
|
|
if "<think>" in b_content:
|
|
stats["skipped_existing_think"] += 1
|
|
elif "<think>" in a_content and b.get("tool_calls"):
|
|
if "Continuing" in a_content or "Following up" in a_content or \
|
|
"Next step" in a_content or "Previous result" in a_content or \
|
|
"Chaining" in a_content or "Got partial" in a_content or \
|
|
"More data" in a_content or "Building on" in a_content:
|
|
stats["continuation_msgs"] += 1
|
|
else:
|
|
stats["tool_call_msgs_modified"] += 1
|
|
elif "<think>" in a_content:
|
|
stats["direct_msgs_modified"] += 1
|
|
|
|
results.append(result)
|
|
|
|
except Exception as e:
|
|
print(f" ERROR on example {i}: {e}", file=sys.stderr)
|
|
stats["errors"] += 1
|
|
results.append(ex) # keep original on error
|
|
|
|
with open(output_file, "w") as f:
|
|
for ex in results:
|
|
f.write(json.dumps(ex, ensure_ascii=False) + "\n")
|
|
|
|
print(f"\nWrote {len(results)} examples to {output_file}")
|
|
print(f"\nStats:")
|
|
print(f" Total examples: {stats['total']}")
|
|
print(f" Tool-call msgs modified: {stats['tool_call_msgs_modified']}")
|
|
print(f" Direct msgs modified: {stats['direct_msgs_modified']}")
|
|
print(f" Continuation msgs: {stats['continuation_msgs']}")
|
|
print(f" Skipped (existing think): {stats['skipped_existing_think']}")
|
|
print(f" Errors: {stats['errors']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|