Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v3
Expand Down
6 changes: 6 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
1.13.0
-----
- Add command for notifying alerts by CSV
- Drop support for Python 3.8
- Add support for Python 3.13

1.12.0
-----
- Add command aliases with dashes (e.g., analyze-by-list) while maintaining backward compatibility with underscore versions
Expand Down
2 changes: 1 addition & 1 deletion intezer_analyze_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.12.0'
__version__ = '1.13.0'
36 changes: 36 additions & 0 deletions intezer_analyze_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,42 @@ def upload_emails_in_directory(emails_root_directory: str, ignore_directory_coun
click.echo('Unexpected error occurred, please contact us at support@intezer.com '
f'and attach the log file in {utilities.log_file_path}')


@main_cli.group('alerts', short_help='Alert management commands')
def alerts():
"""Alert management commands for Intezer Analyze."""
pass


@alerts.command('notify-from-csv', short_help='Notify alerts from CSV file')
@click.argument('csv_path', type=click.Path(exists=True, dir_okay=False))
def notify_from_csv(csv_path: str):
"""Notify alerts from a CSV file containing alert IDs and environments.

\b
CSV_PATH: Path to CSV file with 'id' and 'environment' columns.

\b
CSV Format:
The CSV file should have the following columns:
- id: Alert ID (required)
- environment: Environment name (required)

\b
Examples:
Notify alerts from CSV file:
$ intezer-analyze alerts notify-from-csv ~/alerts.csv
"""
try:
create_global_api()
commands.notify_alerts_from_csv_command(csv_path=csv_path)
except click.Abort:
raise
except Exception:
logger.exception('Unexpected error occurred')
click.echo('Unexpected error occurred, please contact us at support@intezer.com '
f'and attach the log file in {utilities.log_file_path}')

if __name__ == '__main__':
try:
main_cli()
Expand Down
104 changes: 101 additions & 3 deletions intezer_analyze_cli/commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import csv
import logging
import os
from io import BytesIO
from typing import Dict
from typing import List
from typing import Optional
from email.utils import parsedate_to_datetime

Expand Down Expand Up @@ -181,7 +184,7 @@ def index_by_txt_file_command(path: str, index_as: str, family_name: str):
except sdk_errors.IntezerError as e:
index_exceptions.append(f'Failed to index hash: {sha256} error: {e}')
logger.exception('Failed to index hash', extra=dict(sha256=sha256))
except sdk_errors.IndexFailed:
except sdk_errors.IndexFailed as e:
index_exceptions.append(f'Failed to index hash: {sha256} error: {e}')
logger.exception('Failed to index hash', extra=dict(sha256=sha256))
index_progress.update(1)
Expand Down Expand Up @@ -355,7 +358,7 @@ def send_phishing_emails_from_directory_command(path: str,
unsupported_number = 0
emails_dates = []

for root, dirs, files in os.walk(path):
for root, _, files in os.walk(path):
files = [f for f in files if not is_hidden(os.path.join(root, f))]

number_of_files = len(files)
Expand Down Expand Up @@ -385,7 +388,7 @@ def send_phishing_emails_from_directory_command(path: str,
except Exception:
continue

except sdk_errors.IntezerError as ex:
except sdk_errors.IntezerError:
logger.exception('Error while analyzing directory')
failed_number += 1
except Exception:
Expand Down Expand Up @@ -449,3 +452,98 @@ def _create_analysis_id_file(directory: str, analysis_id: str):
logger.exception('Could not create analysis_id.txt file', extra=dict(directory=directory))
click.echo(f'Could not create analysis_id.txt file in {directory}')
raise


def notify_alerts_from_csv_command(csv_path: str):
"""
Notify alerts from a CSV file containing alert IDs and environments.

:param csv_path: Path to CSV file with 'id' and 'environment' columns
"""
try:
alerts_data = _read_alerts_from_csv(csv_path)
success_number = 0
failed_number = 0
no_channels_number = 0

with click.progressbar(length=len(alerts_data),
label='Notifying alerts',
show_pos=True,
width=0) as progressbar:
for alert_data in alerts_data:
alert_id = alert_data['id']
environment = alert_data['environment']

try:
alert = Alert(alert_id=alert_id, environment=environment)
notified_channels = alert.notify()

if notified_channels:
success_number += 1
else:
no_channels_number += 1
logger.info('Alert notified but no channels configured',
extra=dict(alert_id=alert_id, environment=environment))

except sdk_errors.AlertNotFoundError:
click.echo(f'Alert {alert_id} not found')
logger.info('Alert not found', extra=dict(alert_id=alert_id, environment=environment))
failed_number += 1
except sdk_errors.AlertInProgressError:
click.echo(f'Alert {alert_id} is still in progress')
logger.info('Alert in progress', extra=dict(alert_id=alert_id, environment=environment))
failed_number += 1
except sdk_errors.IntezerError as e:
logger.exception('Error while notifying alert', extra=dict(alert_id=alert_id, environment=environment))
failed_number += 1
except Exception:
logger.exception('Unexpected error while notifying alert', extra=dict(alert_id=alert_id, environment=environment))
failed_number += 1

progressbar.update(1)

if success_number > 0:
click.echo(f'{success_number} alerts notified successfully')

if no_channels_number > 0:
click.echo(f'{no_channels_number} alerts didn\'t triggered any notification')

if failed_number > 0:
click.echo(f'{failed_number} alerts failed to notify')

except IOError:
click.echo(f'No read permissions for {csv_path}')
logger.exception('Error reading CSV file', extra=dict(path=csv_path))
raise click.Abort()
except Exception:
logger.exception('Unexpected error occurred while processing CSV file', extra=dict(path=csv_path))
click.echo('Unexpected error occurred while processing CSV file')
raise click.Abort()


def _read_alerts_from_csv(csv_path: str) -> List[Dict[str, Optional[str]]]:
"""
Read alert IDs and environments from CSV file.

:param csv_path: Path to CSV file
:return: List of dictionaries with 'id' and 'environment' keys
:raises ValueError: If required columns are missing
"""
alerts_data = []

with open(csv_path, 'r', newline='', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)

if not reader.fieldnames or 'id' not in reader.fieldnames:
raise ValueError('CSV file must contain an "id" column')

for row in reader:
alert_id = row['id'].strip()
environment = row['environment'].strip()

alerts_data.append({'id': alert_id, 'environment': environment})

if not alerts_data:
raise ValueError('No valid alert data found in CSV file')

return alerts_data
2 changes: 1 addition & 1 deletion requirements-prod.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
click==7.1.2
intezer-sdk==1.21.9
intezer-sdk==1.23.0
4 changes: 2 additions & 2 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pytest>=6.2.5
responses==0.17.0
pytest>=8.4.1
responses==0.25.8
10 changes: 5 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ def rel(*xs):

install_requires = [
'click==7.1.2',
'intezer-sdk>=1.15.2,<2'
'intezer-sdk>=1.23.0,<2'
]
tests_require = [
'pytest==6.1.2',
'responses==0.17.0'
'pytest==8.4.1',
'responses==0.25.8'
]

with open('README.md') as f:
Expand All @@ -35,11 +35,11 @@ def rel(*xs):
description='Client library for Intezer cloud service',
author='Intezer Labs ltd.',
classifiers=[
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12'
'Programming Language :: Python :: 3.13'
],
keywords='intezer',
packages=['intezer_analyze_cli'],
Expand All @@ -52,6 +52,6 @@ def rel(*xs):
license='Apache License v2',
long_description=long_description,
long_description_content_type='text/markdown',
python_requires='>=3.8',
python_requires='>=3.9',
zip_safe=False
)
39 changes: 39 additions & 0 deletions tests/unit/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,45 @@ def test_upload_multiple_eml_files_ignore(self, send_phishing_emails_from_direct
ignore_directory_count_limit=True)


class AlertsSpec(CliSpec):
def setUp(self):
super(AlertsSpec, self).setUp()

create_global_api_patcher = patch('intezer_analyze_cli.cli.create_global_api')
self.create_global_api_patcher_mock = create_global_api_patcher.start()
self.addCleanup(create_global_api_patcher.stop)

key_store.get_stored_api_key = MagicMock(return_value='api_key')

@patch('intezer_analyze_cli.commands.notify_alerts_from_csv_command')
def test_alerts_notify_from_csv_success(self, notify_alerts_from_csv_command_mock):
# Arrange
with tempfile.TemporaryDirectory() as temp_dir:
csv_file_path = os.path.join(temp_dir, 'alerts.csv')
with open(csv_file_path, 'w') as f:
f.write('id,environment\ntest-alert-1,production\ntest-alert-2,staging\n')

# Act
result = self.runner.invoke(cli.main_cli,
['alerts', 'notify-from-csv', csv_file_path])

# Assert
self.assertEqual(result.exit_code, 0, result.exception)
self.assertTrue(notify_alerts_from_csv_command_mock.called)
notify_alerts_from_csv_command_mock.assert_called_once_with(csv_path=csv_file_path)

def test_alerts_notify_from_csv_file_not_exists_returns_error(self):
# Arrange
non_existent_file = '/non/existent/file.csv'

# Act
result = self.runner.invoke(cli.main_cli,
['alerts', 'notify-from-csv', non_existent_file])

# Assert
self.assertEqual(result.exit_code, 2)
self.assertTrue(b'does not exist' in result.stdout_bytes)

class CliIndexSpec(CliSpec):
def setUp(self):
super(CliIndexSpec, self).setUp()
Expand Down
Loading