Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,32 @@
> As of v1.4.0, release candidates will be published in an effort to get new features out faster while still allowing
> time for full QA testing before moving the release candidate to a full release.

## v2.3.0 [2025-10-16]

__What's New:__

* Applications, Environments, Profiles, Resources, and Secrets have a new `--search-text|-S` flag for listing with `ls`.

__Enhancements:__

* Added `search_text` parameter to `list_[applications|environments|profiles|resources|secrets]`
* Added `list_requests` functionality.

__Bug Fixes:__

* Added `PYBRITIVE_ENCRYPTED_CREDENTIAL_PASSPHRASE` env var for AWS/k8s helpers.
* Refactored default `passphrase` as `uuid.getnode` doesn't afford repeatability in sandboxed environments, e.g. `uv`.
* Corrected `my_access` profiles with empty description getting errant `Resource` default value.
* Updated `list_[applications|environments]` to exclude unrelated `my-resources` profiles and replace `null` with `''`.

__Dependencies:__

* None

__Other:__

* Dropped temporary `_get_missing_session_attributes` method, API has been corrected.

## v2.2.3 [2025-08-06]

__What's New:__
Expand Down
2 changes: 1 addition & 1 deletion src/pybritive/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.2.3'
__version__ = '2.3.0'
96 changes: 54 additions & 42 deletions src/pybritive/britive_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ def user(self):
output += f' (alias: {alias})'
self.print(output, ignore_silent=True)

def list_secrets(self):
def list_secrets(self, search_text: Optional[str] = None):
self.login()
self.print(self.b.my_secrets.list(), ignore_silent=True)
self.print(self.b.my_secrets.list(search=search_text), ignore_silent=True)

def list_approvals(self):
self.login()
Expand All @@ -338,14 +338,34 @@ def list_approvals(self):
approvals.reverse()
self.print(approvals, ignore_silent=True)

def list_resources(self):
def list_requests(self):
self.login()
requests = []
for request in self.b.my_requests.list():
request.pop('resource', None)
request.pop('consumer', None)
request.pop('timeToApprove', None)
request.pop('validFor', None)
request.pop('action', None)
request.pop('approvers', None)
request.pop('expirationTimeApproval', None)
request.pop('updatedAt', None)
request.pop('actionBy', None)
request.pop('validForInDays', None)
requests.append(request)

requests = sorted(requests, key=lambda x: x['createdAt'])
requests.reverse()
self.print(requests, ignore_silent=True)

def list_resources(self, search_text: Optional[str] = None):
self.login()
found_resource_names = []
resources = []
if resource_limit := int(self.config.my_resources_retrieval_limit):
profiles = self.b.my_resources.list(size=resource_limit)['data']
else:
profiles = self.b.my_resources.list_profiles()
resource_limit = int(self.config.my_resources_retrieval_limit)
profiles = self.b.my_resources.list(search_text=search_text, size=resource_limit)
if resource_limit:
profiles = profiles['data']
for item in profiles:
name = item['resourceName']
if name not in found_resource_names:
Expand All @@ -360,9 +380,14 @@ def list_resources(self):
found_resource_names.append(name)
self.print(resources, ignore_silent=True)

def list_profiles(self, checked_out: bool = False, profile_type: Optional[str] = None):
def list_profiles(
self,
checked_out: bool = False,
profile_type: Optional[str] = None,
search_text: Optional[str] = None,
):
self.login()
self._set_available_profiles(profile_type=profile_type)
self._set_available_profiles(profile_type=profile_type, search_text=search_text)
data = []
checked_out_profiles = {}
if checked_out: # only make this call if we have to
Expand Down Expand Up @@ -390,7 +415,7 @@ def list_profiles(self, checked_out: bool = False, profile_type: Optional[str] =
'Application': profile['app_name'] or 'Resources',
'Environment': profile['env_name'],
'Profile': profile['profile_name'],
'Description': profile['profile_description'] or 'Resource',
'Description': profile['profile_description'] if profile['app_name'] else 'Resource',
'Type': profile['app_type'],
}

Expand Down Expand Up @@ -427,9 +452,9 @@ def list_profiles(self, checked_out: bool = False, profile_type: Optional[str] =
if self.output_format == 'list-profiles':
self.output_format = 'list'

def list_applications(self):
def list_applications(self, search_text: Optional[str] = None):
self.login()
self._set_available_profiles()
self._set_available_profiles(profile_type='my-access', search_text=search_text)
keys = ['app_name', 'app_type', 'app_description']
apps = []
for profile in self.available_profiles:
Expand All @@ -440,14 +465,14 @@ def list_applications(self):
row = {
'Application': app['app_name'],
'Type': app['app_type'],
'Description': app['app_description'],
'Description': app['app_description'] or '',
}
data.append(row)
self.print(data, ignore_silent=True)

def list_environments(self):
def list_environments(self, search_text: Optional[str] = None):
self.login()
self._set_available_profiles()
self._set_available_profiles(profile_type='my-access', search_text=search_text)
envs = []
keys = ['app_name', 'app_type', 'env_name', 'env_description']
for profile in self.available_profiles:
Expand All @@ -459,37 +484,27 @@ def list_environments(self):
row = {
'Application': env['app_name'],
'Environment': env['env_name'],
'Description': env['env_description'],
'Description': env['env_description'] or '',
'Type': env['app_type'],
}
data.append(row)
self.print(data, ignore_silent=True)

# temporary fix till the new API is updated to return `sessionAttributes`
def _get_missing_session_attributes(self, app_id: str, profile_id: str) -> dict:
if not self.listed_profiles:
self.listed_profiles = self.b.my_access.list_profiles()
return next(
(
profile['sessionAttributes']
for app in self.listed_profiles
if app['appContainerId'] == app_id
for profile in app.get('profiles', [])
if profile['profileId'] == profile_id
),
[],
)

def _set_available_profiles(self, from_cache_command=False, profile_type: Optional[str] = None):
def _set_available_profiles(
self,
from_cache_command=False,
profile_type: Optional[str] = None,
search_text: Optional[str] = None,
):
if not self.available_profiles:
data = []
if not profile_type or profile_type == 'my-access':
self.listed_profiles = None
access_limit = int(self.config.my_access_retrieval_limit)
increase = 0
while (access_data := self.b.my_access.list(size=access_limit + increase))['count'] > len(
access_data['accesses']
) and len({a['papId'] for a in access_data['accesses']}) < access_limit:
while (access_data := self.b.my_access.list(search_text=search_text, size=access_limit + increase))[
'count'
] > len(access_data['accesses']) and len({a['papId'] for a in access_data['accesses']}) < access_limit:
increase += max(25, round(access_data['count'] * 0.25))
apps = {a['appContainerId']: a for a in access_data.get('apps', [])}
envs = {e['environmentId']: e for e in access_data.get('environments', [])}
Expand Down Expand Up @@ -518,18 +533,15 @@ def _set_available_profiles(self, from_cache_command=False, profile_type: Option
'profile_description': profile['papDescription'],
'profile_id': profile_id,
'profile_name': profile['papName'],
'session_attributes': profile.get(
'sessionAttributes', self._get_missing_session_attributes(app_id, profile_id)
),
'session_attributes': profile['sessionAttributes'],
}
if row not in access_output:
access_output.append(row)
data += access_output[:access_limit] if access_limit else access_output
if self.b.feature_flags.get('server-access') and (not profile_type or profile_type == 'my-resources'):
if not (resource_limit := int(self.config.my_resources_retrieval_limit)):
profiles = self.b.my_resources.list_profiles()
else:
profiles = self.b.my_resources.list(size=resource_limit)
resource_limit = int(self.config.my_resources_retrieval_limit)
profiles = self.b.my_resources.list(search_text=search_text, size=resource_limit)
if resource_limit:
profiles = profiles['data']
for item in profiles:
row = {
Expand Down
59 changes: 40 additions & 19 deletions src/pybritive/commands/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,68 @@ def ls():

@ls.command()
@build_britive
@britive_options(names='format,tenant,token,silent,passphrase,federation_provider')
def applications(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
@britive_options(names='search_text,format,tenant,token,silent,passphrase,federation_provider')
def applications(ctx, search_text, output_format, tenant, token, silent, passphrase, federation_provider):
"""List applications for the currently authenticated identity."""
ctx.obj.britive.list_applications()
ctx.obj.britive.list_applications(search_text=search_text)


@ls.command()
@build_britive
@britive_options(names='format,tenant,token,silent,passphrase,federation_provider')
def environments(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
def approvals(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
"""List approvals for the currently authenticated identity."""
ctx.obj.britive.list_approvals()


@ls.command()
@build_britive
@britive_options(names='search_text,format,tenant,token,silent,passphrase,federation_provider')
def environments(ctx, search_text, output_format, tenant, token, silent, passphrase, federation_provider):
"""List environments for the currently authenticated identity."""
ctx.obj.britive.list_environments()
ctx.obj.britive.list_environments(search_text=search_text)


@ls.command()
@build_britive
@britive_options(names='checked_out,profile_type,output_format,tenant,token,silent,passphrase,federation_provider')
def profiles(ctx, checked_out, profile_type, output_format, tenant, token, silent, passphrase, federation_provider):
@britive_options(
names='checked_out,profile_type,search_text,output_format,tenant,token,silent,passphrase,federation_provider'
)
def profiles(
ctx,
checked_out,
profile_type,
search_text,
output_format,
tenant,
token,
silent,
passphrase,
federation_provider,
):
"""List profiles for the currently authenticated identity."""
ctx.obj.britive.list_profiles(checked_out=checked_out, profile_type=profile_type)
ctx.obj.britive.list_profiles(checked_out=checked_out, profile_type=profile_type, search_text=search_text)


@ls.command()
@build_britive
@britive_options(names='format,tenant,token,silent,passphrase,federation_provider')
def secrets(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
"""List secrets for the currently authenticated identity."""
ctx.obj.britive.list_secrets()
def requests(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
"""List requests for the currently authenticated identity."""
ctx.obj.britive.list_requests()


@ls.command()
@build_britive
@britive_options(names='format,tenant,token,silent,passphrase,federation_provider')
def approvals(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
"""List approvals for the currently authenticated identity."""
ctx.obj.britive.list_approvals()
@britive_options(names='search_text,format,tenant,token,silent,passphrase,federation_provider')
def resources(ctx, search_text, output_format, tenant, token, silent, passphrase, federation_provider):
"""List resources for the currently authenticated identity."""
ctx.obj.britive.list_resources(search_text=search_text)


@ls.command()
@build_britive
@britive_options(names='format,tenant,token,silent,passphrase,federation_provider')
def resources(ctx, output_format, tenant, token, silent, passphrase, federation_provider):
"""List resources for the currently authenticated identity."""
ctx.obj.britive.list_resources()
@britive_options(names='search_text,format,tenant,token,silent,passphrase,federation_provider')
def secrets(ctx, search_text, output_format, tenant, token, silent, passphrase, federation_provider):
"""List secrets for the currently authenticated identity."""
ctx.obj.britive.list_secrets(search_text=search_text)
2 changes: 1 addition & 1 deletion src/pybritive/helpers/aws_credential_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def get_args():
args = {
'tenant': None,
'token': None,
'passphrase': None,
'passphrase': os.getenv('PYBRITIVE_ENCRYPTED_CREDENTIAL_PASSPHRASE'),
'force_renew': None,
'profile': None,
'federation_provider': None,
Expand Down
14 changes: 10 additions & 4 deletions src/pybritive/helpers/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import random
import time
import uuid
import webbrowser
from pathlib import Path
from typing import Optional
Expand Down Expand Up @@ -345,10 +346,15 @@ def decrypt(self, encrypted_access_token: str):
try:
return self.string_encryptor.decrypt(ciphertext=encrypted_access_token)
except InvalidPassphraseException:
self.cli.print('invalid passphrase provided - wiping credentials and forcing a re-authentication.')
self.delete()
self.credentials = self.load() or {}
return self.get_token()
try:
self.passphrase = str(uuid.getnode())
self.string_encryptor = StringEncryption(passphrase=self.passphrase)
return self.string_encryptor.decrypt(ciphertext=encrypted_access_token)
except InvalidPassphraseException:
self.cli.print('invalid passphrase provided - wiping credentials and forcing a re-authentication.')
self.delete()
self.credentials = self.load() or {}
return self.get_token()

def encrypt(self, decrypted_access_token: str):
return self.string_encryptor.encrypt(plaintext=decrypted_access_token)
Expand Down
11 changes: 9 additions & 2 deletions src/pybritive/helpers/encryption.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import base64
import hashlib
import os
import uuid
import platform
from getpass import getuser
from typing import Optional

from cryptography.fernet import Fernet, InvalidToken
Expand All @@ -15,7 +17,12 @@ class InvalidPassphraseException(Exception):

class StringEncryption:
def __init__(self, passphrase: Optional[str] = None):
self.passphrase = passphrase or str(uuid.getnode()) # TODO change?
self.passphrase = (
passphrase
or hashlib.sha256(
'|'.join([getuser(), *platform.uname()._asdict().values()]).replace(' ', '').encode('utf-8')
).hexdigest()
)

@staticmethod
def _salt():
Expand Down
8 changes: 7 additions & 1 deletion src/pybritive/helpers/k8s_exec.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from sys import argv, exit


Expand All @@ -8,7 +9,12 @@ def get_args():
argv[1:], 't:T:p:F:hv', ['tenant=', 'token=', 'passphrase=', 'federation-provider=', 'help', 'version']
)[0]

args = {'tenant': None, 'token': None, 'passphrase': None, 'federation_provider': None}
args = {
'tenant': None,
'token': None,
'passphrase': os.getenv('PYBRITIVE_ENCRYPTED_CREDENTIAL_PASSPHRASE'),
'federation_provider': None,
}

for opt, arg in options:
if opt in ('-t', '--tenant'):
Expand Down
Loading