Skip to content

Examplar microengine using the new webhooks bounty passing system

License

Notifications You must be signed in to change notification settings

polyswarm/microengine-webhooks-py

Repository files navigation

microengine-webhooks-py

This project has a simple webhook microengine that can be used as a base to build more complicated microengines. Users should be able to quickly get running by editing only one file with two functions to get started.

Quickstart

Install and test

Clone this repository.

git clone https://github.com/polyswarm/microengine-webhooks-py.git

Install the package in development mode to allow customization. We recommend using a virtual environment.

pip install -e .[web,gunicorn,tests]

Check that the instalation is working correctly

$ python -m microenginewebhookspy.engine analyze --check-eicar
{
  "metadata": {
    "product": "eicar-sample",
    "scanner": {
      "version": "1.0",
      "environment": {
        "operating_system": "Linux",
        "architecture": "x86_64"
      }
    },
    "malware_family": "EICAR",
    "confidence": 1.0
  },
  "verdict": "malicious",
  "bid": 999900000000000000
}

Run the tests

$ pytest -v
================== test session starts ==================
(...)
configfile: pyproject.toml
plugins: requests-mock-1.12.1, mock-3.15.1
collected 4 items

tests/test_scan.py::test_scan_file_malicious PASSED [ 25%]
tests/test_scan.py::test_scan_file_benign PASSED  [ 50%]
tests/test_server.py::test_valid_bounty_to_api PASSED [ 75%]
tests/test_server.py::test_invalid_bounty_to_api PASSED [100%]
============= 4 passed, 4 warnings in 0.09s =============

Now you have a working Engine that detects EICAR as malware.

Implementing your first engine

In microenginewebhookspy/engine.py there are two functions in less than 40 lines of code. The most important one is analyze(bounty), which is where to wire up the malware detection tool.

# import polyswarm_engine as ps

@engine.register_analyzer
def analyze(bounty: ps.Bounty) -> ps.Analysis:
    contents = ps.get_artifact_bytes(bounty)

    if EICAR_STRING in contents:
        verdict = ps.MALICIOUS
        metadata = {'malware_family': 'EICAR', 'confidence': 1.0}
    else:
        verdict = ps.BENIGN
        metadata = {}

    return {
        'verdict': verdict
        'bid': ps.bid_max(bounty),
        'metadata': metadata,
    }

Your return dict will be checked against polyswarm_engine.Analysis rules, e.g. a verdict is present and metadata['confidence'] is a float between 0.0 and 1.0 if provided. For the full ruleset, have a peek at the polyswarm_engine codebase.

Test your engine

During the implementation, you can issue ad-hoc tests calling the python -m microenginewebhookspy.engine analyze tool. Alternatively, it also works by executing the file directly:

$ cd microenginewebhookspy
$ ./engine.py analyze --help
Usage: eicar-sample analyze [OPTIONS] [ARTIFACTS]...

  Analyze artifacts

Options:
  -v, --verbose
  --check-empty     Verify this engine can analyze an empty
                                  bounty
  --check-eicar     Verify this engine can analyze EICAR test
                    file
  --check-wicar, --check-exploit-url
                    Verify this engine can analyze the WICAR
                    exploit kit URL
  -t, --artifact-type [bounty|file|url]
                    Artifact type to use when constructing
                    bounties. 'bounty' loads manually
                    constructed bounties, treating each argument
                    as the path to a JSON-encoded bounty object
--help              Show this message and exit.

The returned value will be checked for structure.

This CLI can issue scans for files in your disk, for local testing purpoposes:

$ ./engine.py analyze ~/Downloads/Firefox\ Installer.exe
{
  ...
  "verdict": "benign",
  "bid": 999900000000000000
}

We recommend that you always check scans for:

  • EMPTY bounties
  • EICAR if creating a file-scanning engines
  • WICAR if creating a url-scanning engine
  • Return UNKNOWN for unsupported file types

Example: Checking the file type

If you run an analysis for WICAR the template implementation will return BENIGN:

$ ./engine.py analyze --check-wicar
{
  ...
  "verdict": "benign",
  "bid": 999900000000000000
}
...
AssertionError: Received 'benign' instead of malicious

As an example, for handling URL bounties gracefully, you can change the engine.py file to have these new lines:

# import polyswarm_engine as ps

 @engine.register_analyzer
 def analyze(bounty: ps.Bounty) -> ps.Analysis:
+    if not ps.bounty.is_file_artifact(bounty):
+        log.error("Received non-file artifact bounty")
+        return ps.bounty.UNSUPPORTED
     contents = ps.get_artifact_bytes(bounty)

It will now change to answer non-file bounties with an UNSUPPORTED verdict.

$ ./engine.py analyze --check-wicar
2025-10-22 20:30:19,022 - ERROR [engine.py:28][analyze] Received non-file artifact bounty
{
  ...
  "verdict": "unknown",
  "bid": 0
}
...
AssertionError: Received 'unknown' instead of malicious

Which is fine for an EICAR engine, that is not supposed to handle URL bounties.

Where to go from here?

This simple engine now does everything in the correct way. Your existing malware-detection tool can be freely integrated within engine.py.

To help you get started, tooling exists inside the polyswarm_engine package. For example, if your tool can natively scan files on the filesystem via CLI, there is a context manager function that downloads the file and stores in a temporary folder, making your life easier:

# import polyswarm_engine as ps

 @engine.register_analyzer
 def analyze(bounty: ps.Bounty) -> ps.Analysis:
-    contents = ps.get_artifact_bytes(bounty)
+    with ps.ArtifactTempfile(bounty) as path:
+        my_tool_do_handle_a_file(path)

That and other niceties are covered in full on the PolySwarm Documentation, specially on the PolySwarm Engine Package section: https://docs.polyswarm.io/engines/polyswarm-engine-package/

How it works?

During the tests above the engine.py analyze tool simulated a Bounty already received and enqueued for processing inside a Celery worker. Then it calls the analyze() function with that Bounty "dict".

For real engines, PolySwarm will send events as HTTP POST requests to your server webhook, configured in the PolySwarm website. Engines need to listen passively until a new event arrives.

Your webserver will receive HTTP requests. A python WSGI application running handles the requests and enqueues a job to be processed by a worker.

The worker runs your function analyze() and it decides the appropriate response. In the same job the worker sends the response back to PolySwarm.

How to run this for real?

Running this outside of the testing scenario needs:

  • An HTTP webserver running your WSGI application microenginewebhookspy.wsgi
  • An AMQP broker (RabbitMQ, for example)
  • A Celery worker running the microenginewebhookspy.tasks worker

Both the HTTP server and the Celery worker need to share the PSENGINE_BROKER_URL environment variable, set by default to 'amqp://user:password@rabbitmq:5672'.

When running outside of the local host, you will need to adjust this envvar. Refer to the docker/docker-compose.yaml for a full local example including an integration server on the port 5000, able to send bounties via HTTP for testing.

More details about the workflow briefly explained above, recommendations and alternatives for common scenarios are also available in the PolySwarm Documentation.

About

Examplar microengine using the new webhooks bounty passing system

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 7