diff --git a/tensors/cli.py b/tensors/cli.py index 3ed5983..570a88e 100644 --- a/tensors/cli.py +++ b/tensors/cli.py @@ -927,8 +927,7 @@ def generate( # noqa: PLR0915 # files, each task must take the non-JSON path. We render our own JSON # at the end, so the per-task --json is incompatible. console.print( - "[red]--json is not supported with --parallel-queue > 1 " - "(would skip the file-save step). Drop one or the other.[/red]" + "[red]--json is not supported with --parallel-queue > 1 (would skip the file-save step). Drop one or the other.[/red]" ) raise typer.Exit(1) # ---- --input merging (JSON or YAML) ---- @@ -1086,7 +1085,7 @@ def generate( # noqa: PLR0915 with Database() as _db: _db.init_schema() _base_model = _db.get_base_model_by_filename(model) - except Exception: # noqa: BLE001 + except Exception: pass _detected = detect_model_family(model, _base_model) _fam = family or _detected @@ -1099,10 +1098,7 @@ def generate( # noqa: PLR0915 # --seed >= 0 → use as base, increment per job (reproducible series) # --seed == -1 → pick a fresh random seed PER JOB so parallel runs aren't # accidentally correlated (each thread gets variety) - if seed >= 0: - seeds = [seed + i for i in range(count)] - else: - seeds = [_rng.randint(0, 2**32 - 1) for _ in range(count)] + seeds = [seed + i for i in range(count)] if seed >= 0 else [_rng.randint(0, 2**32 - 1) for _ in range(count)] # Output paths: mirror the existing `count > 1` naming convention from # _run_generation (stem_NNN.ext). When --output is omitted, leave per-task @@ -1117,8 +1113,7 @@ def generate( # noqa: PLR0915 if not json_output: console.print( - f"[dim]Parallel queue: {effective_parallel} concurrent submissions " - f"× {count} images (output may interleave)[/dim]" + f"[dim]Parallel queue: {effective_parallel} concurrent submissions x {count} images (output may interleave)[/dim]" ) common_kwargs: dict[str, Any] = { @@ -1171,7 +1166,7 @@ def generate( # noqa: PLR0915 except typer.Exit as ex: result["duration_sec"] = round(_time.perf_counter() - start, 2) result["error"] = f"generate exited with code {ex.exit_code}" - except Exception as ex: # noqa: BLE001 + except Exception as ex: result["duration_sec"] = round(_time.perf_counter() - start, 2) result["error"] = str(ex) return result @@ -1179,12 +1174,13 @@ def generate( # noqa: PLR0915 fan_results: list[dict[str, Any]] = [] with ThreadPoolExecutor(max_workers=effective_parallel) as pool: futures = {pool.submit(_run_one, i): i for i in range(count)} - completed = 0 - for fut in as_completed(futures): - completed += 1 + for completed, fut in enumerate(as_completed(futures), start=1): try: res = fut.result() - except Exception as ex: # noqa: BLE001 — defensive; _run_one already swallows + except Exception as ex: + # Defensive — _run_one already swallows, but if the executor itself + # raises (e.g. pickling failure) we still want a well-formed result + # in the manifest rather than a crash. res = { "index": futures[fut], "seed": seeds[futures[fut]], @@ -1198,13 +1194,10 @@ def generate( # noqa: PLR0915 if res["success"]: where = res["output"] or "(no --output set)" console.print( - f"[green]\\[{completed}/{count}] seed={res['seed']} " - f"ok in {res['duration_sec']:.1f}s → {where}[/green]" + f"[green]\\[{completed}/{count}] seed={res['seed']} ok in {res['duration_sec']:.1f}s → {where}[/green]" ) else: - console.print( - f"[red]\\[{completed}/{count}] seed={res['seed']} FAIL: {res['error']}[/red]" - ) + console.print(f"[red]\\[{completed}/{count}] seed={res['seed']} FAIL: {res['error']}[/red]") # Reorder by original index so JSON output / final summary list is stable. fan_results.sort(key=lambda r: r["index"]) diff --git a/tests/test_generate_parallel.py b/tests/test_generate_parallel.py index c3601d1..c5efbc7 100644 --- a/tests/test_generate_parallel.py +++ b/tests/test_generate_parallel.py @@ -2,7 +2,6 @@ from __future__ import annotations -import json from pathlib import Path from typing import Any @@ -258,9 +257,7 @@ def test_parallel_queue_from_yaml_input(tmp_path: Path, calls: list[dict[str, An """parallel_queue can be set via --input YAML (mirrors other generate params).""" out = tmp_path / "img.png" yml = tmp_path / "spec.yml" - yml.write_text( - f'prompt: from-yaml\nmodel: x.safetensors\ncount: 3\nparallel_queue: 3\nseed: 7\noutput: "{out}"\n' - ) + yml.write_text(f'prompt: from-yaml\nmodel: x.safetensors\ncount: 3\nparallel_queue: 3\nseed: 7\noutput: "{out}"\n') result = runner.invoke(app, ["generate", "--input", str(yml)]) assert result.exit_code == 0, result.output assert len(calls) == 3 @@ -271,9 +268,7 @@ def test_cli_parallel_queue_overrides_yaml(tmp_path: Path, calls: list[dict[str, """CLI --parallel-queue wins over YAML's parallel_queue (standard precedence).""" out = tmp_path / "img.png" yml = tmp_path / "spec.yml" - yml.write_text( - f'prompt: from-yaml\nmodel: x.safetensors\ncount: 2\nparallel_queue: 1\nseed: 10\noutput: "{out}"\n' - ) + yml.write_text(f'prompt: from-yaml\nmodel: x.safetensors\ncount: 2\nparallel_queue: 1\nseed: 10\noutput: "{out}"\n') # YAML says P=1 (sequential), CLI overrides to P=2 (fanout) result = runner.invoke(app, ["generate", "--input", str(yml), "-P", "2"]) assert result.exit_code == 0, result.output