Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions modelscan/issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ class IssueSeverity(Enum):
CRITICAL = 4


class IssueCode:
UNSAFE_OPERATOR = Property("UNSAFE_OPERATOR", 1)
class IssueCode(Enum):
UNSAFE_OPERATOR = 1
INVALID_OPCODE = 2
UNUSED_VARIABLE = 3
PROTO_NOT_FIRST = 4
DUPLICATE_PROTO = 5
DUPLICATE_DIFFERENT_PROTO = 6
SUSPICIOUS_LOADER = 7


class IssueDetails(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -83,11 +89,19 @@ def __hash__(self) -> int:
)

def print(self) -> None:
issue_description = self.code.name
if self.code.value == IssueCode.UNSAFE_OPERATOR.value:
issue_description = "Unsafe operator"
else:
logger.error("No issue description for issue code %s", self.code)
issue_description_map = {
IssueCode.UNSAFE_OPERATOR: "Unsafe operator",
IssueCode.INVALID_OPCODE: "Invalid opcode",
IssueCode.UNUSED_VARIABLE: "Unused variable",
IssueCode.PROTO_NOT_FIRST: "PROTO not the first opcode",
IssueCode.DUPLICATE_PROTO: "Duplicate PROTO opcode",
IssueCode.DUPLICATE_DIFFERENT_PROTO: "Duplicate PROTO with different version",
IssueCode.SUSPICIOUS_LOADER: "Suspicious loader",
}

issue_description = issue_description_map.get(
self.code, f"Unknown issue for code {self.code}"
)

print(f"\n{issue_description} found:")
print(f" - Severity: {self.severity.name}")
Expand Down Expand Up @@ -131,19 +145,63 @@ def __init__(
severity: IssueSeverity,
source: Union[Path, str],
scanner: str = "",
message: str,
source: Union[str, Path],
opcode: Optional[int] = None
) -> None:
self.module = module
self.operator = operator
self.source = source
self.severity = severity
self.scanner = scanner
self.message = message
self.opcode = opcode
self.source = source

def output_lines(self) -> List[str]:
lines = []

# Include the message if it exists
if hasattr(self, 'message') and self.message:
lines.append(self.message)

# Append the opcode if it exists
if hasattr(self, 'opcode') and self.opcode is not None:
lines.append(f"Opcode: {self.opcode}")

# Include the unsafe operator description if the attributes exist
if hasattr(self, 'operator') and hasattr(self, 'module'):
lines.append(f"Description: Use of unsafe operator '{self.operator}' from module '{self.module}'")

# Append the source
lines.append(f"Source: {str(self.source)}")

return lines

class UnusedVariableIssueDetails(IssueDetails):
def __init__(self, variable: Any, source: Union[str, Path]) -> None:
self.variable = variable
self.source = source

def output_lines(self) -> List[str]:
return [
f"Description: Use of unsafe operator '{self.operator}' from module '{self.module}'",
f"Unused variable in pickle data: {self.variable}",
f"Source: {str(self.source)}",
]


class UnsafeLoaderIssueDetails(IssueDetails):
def __init__(self, module: str, loader: str, source: Union[str, Path]) -> None:
self.module = module
self.loader = loader
self.source = source

def output_lines(self) -> List[str]:
return [
f"Found suspicious loader from {self.source} : {self.module}.{self.loader}",
f"Source: {str(self.source)}",
]

def output_json(self) -> Dict[str, str]:
return {
"description": f"Use of unsafe operator '{self.operator}' from module '{self.module}'",
Expand Down