Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
]

dependencies = [
"polyswarm_api>=3.15",
"polyswarm_api>=3.16",
"click>=7.1",
"colorama>=0.4.6",
"click-log>=0.4.0",
Expand Down
16 changes: 16 additions & 0 deletions src/polyswarm/client/notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging

import click

from polyswarm.client.notification_webhook import webhook

logger = logging.getLogger(__name__)


@click.group(short_help='Interact with Polyswarm notification systems.')
def notification():
pass


# Add webhook as a subcommand of notification
notification.add_command(webhook)
109 changes: 109 additions & 0 deletions src/polyswarm/client/notification_webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import logging

import click

from polyswarm.client import utils

logger = logging.getLogger(__name__)


@click.group(short_help='Interact with Notification Webhooks in Polyswarm.')
def webhook():
pass


@webhook.command('create', short_help='Create a new webhook.')
@click.argument('webhook-uri', type=click.STRING, required=True)
@click.argument('secret', type=click.STRING, required=True)
@click.option('--status', type=click.Choice(['enabled', 'disabled']), default='enabled',
help='Webhook status (default: enabled).')
@click.option('--events', type=click.STRING, multiple=True,
help='Event types to subscribe to (can be specified multiple times).')
@click.pass_context
def create(ctx, webhook_uri, secret, status, events):
"""
Create a new webhook.

WEBHOOK_URI: The URI where notification webhook events should be sent
SECRET: The secret key used for HMAC signature verification
"""
api = ctx.obj['api']
output = ctx.obj['output']

result = api.notification_webhook_create(webhook_uri=webhook_uri, secret=secret, status=status, events=events)
output.webhook(result)


@webhook.command('get', short_help='Get a webhook by ID.')
@click.argument('webhook-id', callback=utils.validate_id)
@click.pass_context
def get(ctx, webhook_id):
"""
Get a notification webhook by ID.
"""
api = ctx.obj['api']
output = ctx.obj['output']
output.webhook(api.notification_webhook_get(webhook_id))


@webhook.command('update', short_help='Update an existing webhook.')
@click.argument('webhook-id', callback=utils.validate_id)
@click.option('--webhook-uri', type=click.STRING, help='The new webhook URI.')
@click.option('--secret', type=click.STRING, help='The new secret for HMAC signing.')
@click.option('--status', type=click.Choice(['enabled', 'disabled']), help='The new status.')
@click.option('--events', type=click.STRING, multiple=True,
help='Event types to subscribe to (can be specified multiple times).')
@click.pass_context
@utils.any_provided('webhook_uri', 'secret', 'status', 'events')
def update(ctx, webhook_id, webhook_uri, secret, status, events):
"""
Update an existing notification webhook.
"""
api = ctx.obj['api']
output = ctx.obj['output']

result = api.notification_webhook_update(
webhook_id=webhook_id,
webhook_uri=webhook_uri,
secret=secret,
status=status,
events=events
)
output.webhook(result)


@webhook.command('delete', short_help='Delete a webhook.')
@click.argument('webhook-id', callback=utils.validate_id)
@click.pass_context
def delete(ctx, webhook_id):
"""
Delete a notification webhook.
"""
api = ctx.obj['api']
output = ctx.obj['output']
api.notification_webhook_delete(webhook_id)
click.echo(f'Webhook {webhook_id} deleted successfully')


@webhook.command('list', short_help='List all webhooks.')
@click.pass_context
def list_webhooks(ctx):
"""
List all notification webhooks for the current account.
"""
api = ctx.obj['api']
output = ctx.obj['output']
for webhook_obj in api.notification_webhook_list():
output.webhook(webhook_obj)


@webhook.command('test', short_help='Test a webhook by sending a test payload.')
@click.argument('webhook-id', callback=utils.validate_id)
@click.pass_context
def test(ctx, webhook_id):
"""
Test a notification webhook by sending a test payload.
"""
api = ctx.obj['api']
api.notification_webhook_test(webhook_id)
click.echo(f'Test payload sent to webhook {webhook_id}')
3 changes: 2 additions & 1 deletion src/polyswarm/client/polyswarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from polyswarm.client.bundle import bundle
from polyswarm.client.report_template import report_template
from polyswarm.client.account import account
from polyswarm.client.notification import notification

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -156,7 +157,7 @@ def polyswarm_cli(ctx, api_key, api_uri, output_file, output_format, color, verb
cat, stream, rescan, rescan_id,
rules, link, tag, family, metadata,
engine, known, sandbox, sandbox_list,
activity, report, report_template, account, bundle,
activity, report, report_template, account, bundle, notification,
]

for command in commands:
Expand Down
2 changes: 2 additions & 0 deletions src/polyswarm/formatters/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def account_features(self, result, write=True):
def llm_prompt_config(self, result, write=True):
click.echo(self._to_json(result.json), file=self.out)

def webhook(self, result):
click.echo(self._to_json(result.json), file=self.out)

class PrettyJSONOutput(JSONOutput):
name = 'pretty-json'
Expand Down
14 changes: 14 additions & 0 deletions src/polyswarm/formatters/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,20 @@ def llm_prompt_config(self, config, write=True):
output.append(self._white(f'Scan-Only Prompt: {config.scan_only_prompt}'))
return self._output(output, write)

def webhook(self, webhook, write=True):
output = []
output.append(self._white('============================= Webhook ============================='))
output.append(self._blue(f'ID: {webhook.id}'))
output.append(self._white(f'Webhook URI: {webhook.webhook_uri}'))
output.append(self._white(f'Account Number: {webhook.account_number}'))
if webhook.team_account_number:
output.append(self._white(f'Team Account Number: {webhook.team_account_number}'))
status_writer = self._green if webhook.status == 'enabled' else self._yellow
output.append(status_writer(f'Status: {webhook.status}'))
if webhook.events:
output.append(self._white(f'Events: {json.dumps(webhook.events, indent=2)}'))
return self._output(output, write)

@is_grouped
def _white(self, text):
return click.style(text, fg='white')
Expand Down