Source code for mongodb_iam_connection_string.mics

from os import getenv
from urllib.parse import parse_qsl, quote_plus

from boto3 import Session as boto_session
from botocore.exceptions import ClientError
from yurl import URL

from .exceptions import InvalidAWSSession


[docs]class MongoDBIAMConnectionString: def __init__(self, connection_string: str, profile_name: str = 'default'): self.creds = {} self.url = URL(connection_string) self.profile = profile_name self.qs = dict(parse_qsl(self.url.query)) if self._gather_aws_creds(): self._update_url() else: raise InvalidAWSSession def _gather_aws_creds(self): if self.profile == 'default': try: _creds = boto_session().get_credentials().get_frozen_credentials() except Exception: return False else: try: _creds = boto_session( profile_name=self.profile).get_credentials().get_frozen_credentials() except Exception: return False for _ in ('access_key', 'secret_key', 'token'): if getattr(_creds, _): self.creds.update({_: str(getattr(_creds, _))}) return True def _dict2qs(self, d: dict): return "&".join([f"{k}={v}" for k, v in d.items()]) def _update_url(self): # yurl strangely parses hosts as paths, fix that if not self.url.host and self.url.path: self.url = self.url.replace(host=self.url.path, path='/') # Add proper protocol if not specified if not self.url.scheme: self.url = self.url.replace(scheme='mongodb+srv') # Add access and secret keys as username and password self.url = self.url.replace( userinfo=f"{quote_plus(self.creds['access_key'])}:{quote_plus(self.creds['secret_key'])}") # Set path to '/' if not provided if not self.url.path: self.url = self.url.replace(path='/') self.qs.update({'authSource': '$external'}) self.qs.update({'authMechanism': 'MONGODB-AWS'}) _token = self.creds.get('token') if _token: self.qs.update( {'authMechanismProperties': f"AWS_SESSION_TOKEN:{quote_plus(_token)}"}) self.url = self.url.replace(query=self._dict2qs(self.qs)) def __repr__(self): return str(self.url) def __str__(self): return str(self.url)