diff --git a/pyproject.toml b/pyproject.toml index 9de5201..78f3bd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ ] dependencies = [ - "polyswarm_api>=3.15", + "polyswarm_api>=3.16", "click>=7.1", "colorama>=0.4.6", "click-log>=0.4.0", diff --git a/src/polyswarm/client/notification.py b/src/polyswarm/client/notification.py new file mode 100644 index 0000000..7dab708 --- /dev/null +++ b/src/polyswarm/client/notification.py @@ -0,0 +1,16 @@ +import logging + +import click + +from polyswarm.client.notification_webhook import webhook + +logger = logging.getLogger(__name__) + + +@click.group(short_help='Interact with Polyswarm notification systems.') +def notification(): + pass + + +# Add webhook as a subcommand of notification +notification.add_command(webhook) diff --git a/src/polyswarm/client/notification_webhook.py b/src/polyswarm/client/notification_webhook.py new file mode 100644 index 0000000..a699d7f --- /dev/null +++ b/src/polyswarm/client/notification_webhook.py @@ -0,0 +1,109 @@ +import logging + +import click + +from polyswarm.client import utils + +logger = logging.getLogger(__name__) + + +@click.group(short_help='Interact with Notification Webhooks in Polyswarm.') +def webhook(): + pass + + +@webhook.command('create', short_help='Create a new webhook.') +@click.argument('webhook-uri', type=click.STRING, required=True) +@click.argument('secret', type=click.STRING, required=True) +@click.option('--status', type=click.Choice(['enabled', 'disabled']), default='enabled', + help='Webhook status (default: enabled).') +@click.option('--events', type=click.STRING, multiple=True, + help='Event types to subscribe to (can be specified multiple times).') +@click.pass_context +def create(ctx, webhook_uri, secret, status, events): + """ + Create a new webhook. + + WEBHOOK_URI: The URI where notification webhook events should be sent + SECRET: The secret key used for HMAC signature verification + """ + api = ctx.obj['api'] + output = ctx.obj['output'] + + result = api.notification_webhook_create(webhook_uri=webhook_uri, secret=secret, status=status, events=events) + output.webhook(result) + + +@webhook.command('get', short_help='Get a webhook by ID.') +@click.argument('webhook-id', callback=utils.validate_id) +@click.pass_context +def get(ctx, webhook_id): + """ + Get a notification webhook by ID. + """ + api = ctx.obj['api'] + output = ctx.obj['output'] + output.webhook(api.notification_webhook_get(webhook_id)) + + +@webhook.command('update', short_help='Update an existing webhook.') +@click.argument('webhook-id', callback=utils.validate_id) +@click.option('--webhook-uri', type=click.STRING, help='The new webhook URI.') +@click.option('--secret', type=click.STRING, help='The new secret for HMAC signing.') +@click.option('--status', type=click.Choice(['enabled', 'disabled']), help='The new status.') +@click.option('--events', type=click.STRING, multiple=True, + help='Event types to subscribe to (can be specified multiple times).') +@click.pass_context +@utils.any_provided('webhook_uri', 'secret', 'status', 'events') +def update(ctx, webhook_id, webhook_uri, secret, status, events): + """ + Update an existing notification webhook. + """ + api = ctx.obj['api'] + output = ctx.obj['output'] + + result = api.notification_webhook_update( + webhook_id=webhook_id, + webhook_uri=webhook_uri, + secret=secret, + status=status, + events=events + ) + output.webhook(result) + + +@webhook.command('delete', short_help='Delete a webhook.') +@click.argument('webhook-id', callback=utils.validate_id) +@click.pass_context +def delete(ctx, webhook_id): + """ + Delete a notification webhook. + """ + api = ctx.obj['api'] + output = ctx.obj['output'] + api.notification_webhook_delete(webhook_id) + click.echo(f'Webhook {webhook_id} deleted successfully') + + +@webhook.command('list', short_help='List all webhooks.') +@click.pass_context +def list_webhooks(ctx): + """ + List all notification webhooks for the current account. + """ + api = ctx.obj['api'] + output = ctx.obj['output'] + for webhook_obj in api.notification_webhook_list(): + output.webhook(webhook_obj) + + +@webhook.command('test', short_help='Test a webhook by sending a test payload.') +@click.argument('webhook-id', callback=utils.validate_id) +@click.pass_context +def test(ctx, webhook_id): + """ + Test a notification webhook by sending a test payload. + """ + api = ctx.obj['api'] + api.notification_webhook_test(webhook_id) + click.echo(f'Test payload sent to webhook {webhook_id}') diff --git a/src/polyswarm/client/polyswarm.py b/src/polyswarm/client/polyswarm.py index f1508e0..1c6b4db 100644 --- a/src/polyswarm/client/polyswarm.py +++ b/src/polyswarm/client/polyswarm.py @@ -30,6 +30,7 @@ from polyswarm.client.bundle import bundle from polyswarm.client.report_template import report_template from polyswarm.client.account import account +from polyswarm.client.notification import notification logger = logging.getLogger(__name__) @@ -156,7 +157,7 @@ def polyswarm_cli(ctx, api_key, api_uri, output_file, output_format, color, verb cat, stream, rescan, rescan_id, rules, link, tag, family, metadata, engine, known, sandbox, sandbox_list, - activity, report, report_template, account, bundle, + activity, report, report_template, account, bundle, notification, ] for command in commands: diff --git a/src/polyswarm/formatters/json.py b/src/polyswarm/formatters/json.py index 6ea2822..2473dcc 100644 --- a/src/polyswarm/formatters/json.py +++ b/src/polyswarm/formatters/json.py @@ -166,6 +166,8 @@ def account_features(self, result, write=True): def llm_prompt_config(self, result, write=True): click.echo(self._to_json(result.json), file=self.out) + def webhook(self, result): + click.echo(self._to_json(result.json), file=self.out) class PrettyJSONOutput(JSONOutput): name = 'pretty-json' diff --git a/src/polyswarm/formatters/text.py b/src/polyswarm/formatters/text.py index e5c2acd..0557266 100644 --- a/src/polyswarm/formatters/text.py +++ b/src/polyswarm/formatters/text.py @@ -676,6 +676,20 @@ def llm_prompt_config(self, config, write=True): output.append(self._white(f'Scan-Only Prompt: {config.scan_only_prompt}')) return self._output(output, write) + def webhook(self, webhook, write=True): + output = [] + output.append(self._white('============================= Webhook =============================')) + output.append(self._blue(f'ID: {webhook.id}')) + output.append(self._white(f'Webhook URI: {webhook.webhook_uri}')) + output.append(self._white(f'Account Number: {webhook.account_number}')) + if webhook.team_account_number: + output.append(self._white(f'Team Account Number: {webhook.team_account_number}')) + status_writer = self._green if webhook.status == 'enabled' else self._yellow + output.append(status_writer(f'Status: {webhook.status}')) + if webhook.events: + output.append(self._white(f'Events: {json.dumps(webhook.events, indent=2)}')) + return self._output(output, write) + @is_grouped def _white(self, text): return click.style(text, fg='white')