Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 184 additions & 51 deletions src/sigmaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,9 @@ class SigMakerConfig:
# Seconds before first prompt. -1 (or 0) disables the periodic
# "Continue?" popup -- the wait-box Cancel button still works.
prompt_interval: int = -1
# Issue #22: when True, cancelling a unique-signature search emits the
# partial signature with its match count instead of nothing. Default off.
output_partial_on_cancel: bool = False


@dataclasses.dataclass(slots=True, frozen=True, repr=False)
Expand Down Expand Up @@ -638,6 +641,34 @@ class Action(enum.IntEnum):
SEARCH = 3


class GenerationStatus(enum.Enum):
"""How a GeneratedSignature should be interpreted."""

UNIQUE = "unique"
PARTIAL_ON_CANCEL = "partial_on_cancel"


@dataclasses.dataclass(frozen=True)
class GenerationPolicy:
"""Behavioral knobs passed into generator strategies.

Callers pick the policy that matches their tolerance for non-unique
signatures. The default is strict (legacy behavior: cancel raises).
"""

return_partial_on_cancel: bool = False

@classmethod
def strict(cls) -> "GenerationPolicy":
"""Cancel raises UserCanceledError (legacy behavior)."""
return cls(return_partial_on_cancel=False)

@classmethod
def permissive(cls) -> "GenerationPolicy":
"""Cancel returns a partial GeneratedSignature instead of raising."""
return cls(return_partial_on_cancel=True)


class SignatureByte(typing.NamedTuple):
"""Container representing a single byte in a signature.

Expand Down Expand Up @@ -975,14 +1006,37 @@ class GeneratedSignature:

signature: Signature
address: Match | None = None
status: GenerationStatus = GenerationStatus.UNIQUE
match_count: int | None = None

def display(self, cfg: SigMakerConfig) -> None:
"""Display the signature result to the user."""
"""Display the signature result to the user.

UNIQUE: prints the formatted signature and copies to the clipboard.
PARTIAL_ON_CANCEL: prints the partial with its match count; does NOT
touch the clipboard so an accidental cancel cannot clobber the
user's clipboard contents.
"""
if not self.signature:
idaapi.msg("Error: Empty signature\n")
return
t = cfg.output_format.value
fmted = format(self.signature, t)

if self.status == GenerationStatus.PARTIAL_ON_CANCEL:
count_str = (
f"{self.match_count} matches"
if self.match_count is not None
else "match count unavailable"
)
prefix = (
f"Partial signature (NOT unique, {count_str}) for {self.address}"
if self.address is not None
else f"Partial signature (NOT unique, {count_str})"
)
idaapi.msg(f"{prefix}: {fmted}\n")
return

if self.address is not None:
idaapi.msg(f"Signature for {self.address}: {fmted}\n")
else:
Expand Down Expand Up @@ -1238,7 +1292,11 @@ class InstructionWalker:
"""

start_ea: int
end_ea: int = idaapi.BADADDR
# Resolve BADADDR lazily so tests that patch `idaapi.BADADDR` at runtime
# actually take effect. With `default=idaapi.BADADDR`, the value was
# evaluated at class-definition (module import) time, which under the
# unit-test mock of `idaapi` froze it as a MagicMock attribute.
end_ea: int = dataclasses.field(default_factory=lambda: idaapi.BADADDR)

# Internal state fields
cursor: int = dataclasses.field(init=False)
Expand Down Expand Up @@ -1287,19 +1345,33 @@ def __init__(
self.processor = processor
self.progress_reporter = progress_reporter

def generate(self, ea: int, cfg: SigMakerConfig) -> Signature:
def generate(
self,
ea: int,
cfg: SigMakerConfig,
*,
policy: GenerationPolicy = GenerationPolicy.strict(),
) -> GeneratedSignature:
"""Generate a unique signature starting at the given address.

Args:
ea: Starting address for signature generation
cfg: Configuration for signature generation
policy: Controls cancel-time behavior. Default strict() raises
UserCanceledError on cancel (legacy contract). permissive()
returns a partial GeneratedSignature with the most recently
observed match count.

Returns:
A unique signature
A GeneratedSignature. status=UNIQUE on success;
status=PARTIAL_ON_CANCEL when policy.return_partial_on_cancel
is True and the user cancels after at least one byte has been
appended.

Raises:
Unexpected: If signature cannot be made unique
UserCanceledError: If user cancels via progress reporter
UserCanceledError: If user cancels and policy.return_partial_on_cancel
is False, or if cancel happens before any byte is appended.
"""
if not is_address_marked_as_code(ea):
raise Unexpected("Cannot create code signature for data")
Expand All @@ -1308,51 +1380,88 @@ def generate(self, ea: int, cfg: SigMakerConfig) -> Signature:
start_fn = idaapi.get_func(ea)
bytes_since_last_check = 0
instruction_count = 0
last_match_count: int | None = None

for cur_ea, ins, ins_len in InstructionWalker(ea):
# Check for cancellation via progress reporter
if self.progress_reporter is not None and self.progress_reporter.should_cancel():
def _build_partial() -> GeneratedSignature:
# Trim trailing wildcards, mirror the success path.
sig.trim_signature()
if len(sig) == 0:
raise UserCanceledError("Signature generation canceled by user")
return GeneratedSignature(
sig,
Match(ea),
status=GenerationStatus.PARTIAL_ON_CANCEL,
match_count=last_match_count,
)

# Update progress periodically
instruction_count += 1
progress_reporting = self.progress_reporter is not None and self.progress_reporter.enabled()
if progress_reporting and instruction_count % 100 == 0:
self.progress_reporter.report_progress(
message=f"Generating signature at {hex(cur_ea)}",
signature_length=len(sig),
instructions_processed=instruction_count,
)
try:
for cur_ea, ins, ins_len in InstructionWalker(ea):
# Check for cancellation via progress reporter
if self.progress_reporter is not None and self.progress_reporter.should_cancel():
if policy.return_partial_on_cancel:
return _build_partial()
raise UserCanceledError("Signature generation canceled by user")

# Update progress periodically
instruction_count += 1
progress_reporting = self.progress_reporter is not None and self.progress_reporter.enabled()
if progress_reporting and instruction_count % 100 == 0:
self.progress_reporter.report_progress(
message=f"Generating signature at {hex(cur_ea)}",
signature_length=len(sig),
instructions_processed=instruction_count,
)

# Check length constraint
if bytes_since_last_check > cfg.max_single_signature_length:
# Check length constraint
if bytes_since_last_check > cfg.max_single_signature_length:
if (
not cfg.ask_longer_signature
or idaapi.ask_yn(
idaapi.ASKBTN_NO,
f"Signature is already {len(sig)} bytes. Continue?",
)
!= idaapi.ASKBTN_YES
):
raise Unexpected("Signature not unique within length constraints")
bytes_since_last_check = 0 # Reset counter after user confirmation

# Check function boundary constraint
if (
not cfg.ask_longer_signature
or idaapi.ask_yn(
idaapi.ASKBTN_NO,
f"Signature is already {len(sig)} bytes. Continue?",
)
!= idaapi.ASKBTN_YES
not cfg.continue_outside_of_function
and start_fn
and cur_ea >= start_fn.end_ea
):
raise Unexpected("Signature not unique within length constraints")
bytes_since_last_check = 0 # Reset counter after user confirmation

# Check function boundary constraint
if (
not cfg.continue_outside_of_function
and start_fn
and cur_ea >= start_fn.end_ea
):
raise Unexpected("Signature left function scope without being unique")

self.processor.append_instruction_to_sig(
sig, cur_ea, ins, cfg.wildcard_operands, cfg.wildcard_optimized
)
bytes_since_last_check += ins_len
raise Unexpected("Signature left function scope without being unique")

if SignatureSearcher.is_unique(f"{sig:ida}"):
sig.trim_signature()
return sig
self.processor.append_instruction_to_sig(
sig, cur_ea, ins, cfg.wildcard_operands, cfg.wildcard_optimized
)
bytes_since_last_check += ins_len

count = SignatureSearcher.count_matches(f"{sig:ida}")
# SignatureSearcher.find_all polls idaapi_user_canceled inside
# its scan loop and bails when set, returning whatever partial
# count it had so far (often 0). If we stored that, the
# partial-on-cancel path would show "0 matches" for a signature
# that actually matches many places.
#
# When the call was interrupted, keep last_match_count at its
# prior trustworthy value (the count from the previous fully
# completed iteration). The reported number then corresponds
# to a signature one instruction shorter than the partial we
# emit, which means it is an UPPER BOUND on the partial's
# actual match count -- a useful number, not a meaningless 0.
if not idaapi_user_canceled():
last_match_count = count
if count == 1:
sig.trim_signature()
return GeneratedSignature(sig, Match(ea))
except UserCanceledError:
# InstructionWalker raises UserCanceledError on cancel too (issue #18).
# Honor the policy here as well.
if policy.return_partial_on_cancel:
return _build_partial()
raise

raise Unexpected("Signature not unique (reached end of analysis)")

Expand Down Expand Up @@ -1485,6 +1594,7 @@ def make_signature(
end: int | None = None,
*,
progress_reporter: typing.Optional[ProgressReporter] = None,
policy: GenerationPolicy = GenerationPolicy.strict(),
) -> GeneratedSignature:
"""Creates a signature for a single address (unique) or an address range.

Expand Down Expand Up @@ -1527,13 +1637,15 @@ def make_signature(
if end is None:
# Create unique signature generator via factory method
generator = self._create_generator(for_range=False, progress_reporter=progress_reporter)
sig = generator.generate(start_ea, cfg)
return GeneratedSignature(sig, Match(start_ea))
# UniqueSignatureGenerator.generate returns a GeneratedSignature
# directly so it can carry status + match_count on cancel.
return generator.generate(start_ea, cfg, policy=policy)

if end <= start_ea:
raise Unexpected("End address must be after start address")

# Create range signature generator via factory method
# Create range signature generator via factory method.
# Range generator returns a bare Signature; policy is not applicable.
generator = self._create_generator(for_range=True, progress_reporter=progress_reporter)
sig = generator.generate(start_ea, end, cfg)
return GeneratedSignature(sig)
Expand Down Expand Up @@ -1830,9 +1942,14 @@ def find_all(ida_signature: str) -> list[Match]:
ea = hit + 1
return out

@classmethod
def count_matches(cls, ida_signature: str) -> int:
"""Return the number of matches for the given IDA-format signature."""
return len(cls.find_all(ida_signature))

@classmethod
def is_unique(cls, ida_signature: str) -> bool:
return len(cls.find_all(ida_signature)) == 1
return cls.count_matches(ida_signature) == 1


# no cover: start
Expand Down Expand Up @@ -2114,7 +2231,8 @@ def __init__(self) -> None:
<#Enable wildcarding for operands, to improve stability of created signatures#Wildcards for operands:{cWildcardOperands}>
<#Don't stop signature generation when reaching end of function#Continue when leaving function scope:{cContinueOutside}>
<#Wildcard the whole instruction when the operand (usually a register) is encoded into the operator#Wildcard optimized / combined instructions:{cWildcardOptimized}>
<#Opt-in -- show periodic 'Continue?' prompts while generating. Default is a wait-box with a Cancel button.#Enable continue prompt (opt-in):{cEnablePrompt}>{cGroupOptions}>
<#Opt-in -- show periodic 'Continue?' prompts while generating. Default is a wait-box with a Cancel button.#Enable continue prompt (opt-in):{cEnablePrompt}>
<#Opt-in -- when you cancel a unique-signature search, output the partial signature (with match count) instead of nothing. Default off. (Issue #22)#Output partial signature on cancel (opt-in):{cOutputPartialOnCancel}>{cGroupOptions}>

<Operand types...:{bOperandTypes}><Other options...:{bOtherOptions}>
"""
Expand All @@ -2129,9 +2247,15 @@ def __init__(self) -> None:
("rIDASig", "rx64DbgSig", "rByteArrayMaskSig", "rRawBytesBitmaskSig")
),
"cGroupOptions": idaapi.Form.ChkGroupControl(
("cWildcardOperands", "cContinueOutside", "cWildcardOptimized", "cEnablePrompt"),
(
"cWildcardOperands",
"cContinueOutside",
"cWildcardOptimized",
"cEnablePrompt",
"cOutputPartialOnCancel",
),
# Bits: 1 (wildcards) + 4 (wildcard optimized). Bit 8 (enable
# prompt) defaults OFF; the wait-box Cancel handles long runs.
# prompt) and bit 16 (output partial on cancel) default OFF.
value=5,
),
"bOperandTypes": F.ButtonInput(self.ConfigureOperandWildcardBitmask),
Expand Down Expand Up @@ -2272,6 +2396,7 @@ def run(self, ctx) -> None:
continue_outside_of_function = bool(form.cGroupOptions.value & 2) # type: ignore
wildcard_optimized = bool(form.cGroupOptions.value & 4) # type: ignore
enable_continue_prompt = bool(form.cGroupOptions.value & 8) # type: ignore
output_partial_on_cancel = bool(form.cGroupOptions.value & 16) # type: ignore

# Create SigMakerConfig
config = SigMakerConfig(
Expand All @@ -2280,15 +2405,23 @@ def run(self, ctx) -> None:
continue_outside_of_function=continue_outside_of_function,
wildcard_optimized=wildcard_optimized,
enable_continue_prompt=enable_continue_prompt,
output_partial_on_cancel=output_partial_on_cancel,
)

try:
if action == Action.CREATE_UNIQUE:
ea = idaapi.get_screen_ea()
policy = (
GenerationPolicy.permissive()
if config.output_partial_on_cancel
else GenerationPolicy.strict()
)
with ProgressDialog(
"Generating signature...\n\nPress Cancel to stop"
):
signature = SignatureMaker().make_signature(ea, config)
signature = SignatureMaker().make_signature(
ea, config, policy=policy
)
signature.display(config)
elif action == Action.FIND_XREF:
ea = idaapi.get_screen_ea()
Expand Down
Loading
Loading