Website : rimsha.abasa.com
backdoor
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
lib
/
python3
/
dist-packages
/
awscli
/
customizations
/
codeartifact
/
Filename :
login.py
back
Copy
import errno import os import platform import sys import subprocess import re from datetime import datetime from dateutil.tz import tzutc from dateutil.relativedelta import relativedelta from botocore.utils import parse_timestamp from awscli.compat import is_windows, urlparse, RawConfigParser, StringIO from awscli.customizations import utils as cli_utils from awscli.customizations.commands import BasicCommand from awscli.customizations.utils import uni_print def get_relative_expiration_time(remaining): values = [] prev_non_zero_attr = False for attr in ["years", "months", "days", "hours", "minutes"]: value = getattr(remaining, attr) if value > 0: if prev_non_zero_attr: values.append("and") values.append(str(value)) values.append(attr[:-1] if value == 1 else attr) if prev_non_zero_attr: break prev_non_zero_attr = value > 0 message = " ".join(values) return message class BaseLogin(object): _TOOL_NOT_FOUND_MESSAGE = '%s was not found. Please verify installation.' def __init__(self, auth_token, expiration, repository_endpoint, domain, repository, subprocess_utils, namespace=None): self.auth_token = auth_token self.expiration = expiration self.repository_endpoint = repository_endpoint self.domain = domain self.repository = repository self.subprocess_utils = subprocess_utils self.namespace = namespace def login(self, dry_run=False): raise NotImplementedError('login()') def _dry_run_commands(self, tool, commands): for command in commands: sys.stdout.write(' '.join(command)) sys.stdout.write(os.linesep) sys.stdout.write(os.linesep) def _write_success_message(self, tool): # add extra 30 seconds make expiration more reasonable # for some corner case # e.g. 11 hours 59 minutes 31 seconds should output --> 12 hours. remaining = relativedelta( self.expiration, datetime.now(tzutc())) + relativedelta(seconds=30) expiration_message = get_relative_expiration_time(remaining) sys.stdout.write('Successfully configured {} to use ' 'AWS CodeArtifact repository {} ' .format(tool, self.repository_endpoint)) sys.stdout.write(os.linesep) sys.stdout.write('Login expires in {} at {}'.format( expiration_message, self.expiration)) sys.stdout.write(os.linesep) def _run_commands(self, tool, commands, dry_run=False): if dry_run: self._dry_run_commands(tool, commands) return for command in commands: try: self.subprocess_utils.check_call( command, stdout=self.subprocess_utils.PIPE, stderr=self.subprocess_utils.PIPE, ) except OSError as ex: if ex.errno == errno.ENOENT: raise ValueError( self._TOOL_NOT_FOUND_MESSAGE % tool ) raise ex self._write_success_message(tool) @classmethod def get_commands(cls, endpoint, auth_token, **kwargs): raise NotImplementedError('get_commands()') class NuGetBaseLogin(BaseLogin): _NUGET_INDEX_URL_FMT = '{endpoint}v3/index.json' # When adding new sources we can specify that we added the source to the # user level NuGet.Config file. However, when updating an existing source # we cannot be specific about which level NuGet.Config file was updated # because it is possible that the existing source was not in the user # level NuGet.Config. The source listing command returns all configured # sources from all NuGet.Config levels. The update command updates the # source in whichever NuGet.Config file the source was found. _SOURCE_ADDED_MESSAGE = 'Added source %s to the user level NuGet.Config\n' _SOURCE_UPDATED_MESSAGE = 'Updated source %s in the NuGet.Config\n' def login(self, dry_run=False): try: source_to_url_dict = self._get_source_to_url_dict() except OSError as ex: if ex.errno == errno.ENOENT: raise ValueError( self._TOOL_NOT_FOUND_MESSAGE % self._get_tool_name() ) raise ex nuget_index_url = self._NUGET_INDEX_URL_FMT.format( endpoint=self.repository_endpoint ) source_name, already_exists = self._get_source_name( nuget_index_url, source_to_url_dict ) if already_exists: command = self._get_configure_command( 'update', nuget_index_url, source_name ) source_configured_message = self._SOURCE_UPDATED_MESSAGE else: command = self._get_configure_command('add', nuget_index_url, source_name) source_configured_message = self._SOURCE_ADDED_MESSAGE if dry_run: dry_run_command = ' '.join([str(cd) for cd in command]) uni_print(dry_run_command) uni_print('\n') return try: self.subprocess_utils.check_output( command, stderr=self.subprocess_utils.PIPE ) except subprocess.CalledProcessError as e: uni_print('Failed to update the NuGet.Config\n') raise e uni_print(source_configured_message % source_name) self._write_success_message('nuget') def _get_source_to_url_dict(self): # The response from listing sources takes the following form: # # Registered Sources: # 1. Source Name 1 [Enabled] # https://source1.com/index.json # 2. Source Name 2 [Disabled] # https://source2.com/index.json # ... # 100. Source Name 100 # https://source100.com/index.json # Or it can be (blank line after Registered Sources:) # Registered Sources: # 1. Source Name 1 [Enabled] # https://source1.com/index.json # 2. Source Name 2 [Disabled] # https://source2.com/index.json # ... # 100. Source Name 100 # https://source100.com/index.json response = self.subprocess_utils.check_output( self._get_list_command(), stderr=self.subprocess_utils.PIPE ) lines = response.decode("utf-8").splitlines() lines = [line for line in lines if line.strip() != ''] source_to_url_dict = {} for i in range(1, len(lines), 2): source_to_url_dict[self._parse_source_name(lines[i])] = \ self._parse_source_url(lines[i + 1]) return source_to_url_dict def _parse_source_name(self, line): # A source name line takes the following form: # 1. NuGet Source [Enabled] # Remove the Enabled/Disabled tag. line_without_tag = line.strip().rsplit(' [', 1)[0] # Remove the leading number. return line_without_tag.split(None, 1)[1] def _parse_source_url(self, line): # A source url line takes the following form: # https://source.com/index.json return line.strip() def _get_source_name(self, codeartifact_url, source_dict): default_name = '{}/{}'.format(self.domain, self.repository) # Check if the CodeArtifact URL is already present in the # NuGet.Config file. If the URL already exists, use the source name # already assigned to the CodeArtifact URL. for source_name, source_url in source_dict.items(): if source_url == codeartifact_url: return source_name, True # If the CodeArtifact URL is not present in the NuGet.Config file, # check if the default source name already exists so we can know # whether we need to add a new entry or update the existing entry. for source_name in source_dict.keys(): if source_name == default_name: return source_name, True # If neither the source url nor the source name already exist in the # NuGet.Config file, use the default source name. return default_name, False def _get_tool_name(self): raise NotImplementedError('_get_tool_name()') def _get_list_command(self): raise NotImplementedError('_get_list_command()') def _get_configure_command(self, operation, nuget_index_url, source_name): raise NotImplementedError('_get_configure_command()') class NuGetLogin(NuGetBaseLogin): def _get_tool_name(self): return 'nuget' def _get_list_command(self): return ['nuget', 'sources', 'list', '-format', 'detailed'] def _get_configure_command(self, operation, nuget_index_url, source_name): return [ 'nuget', 'sources', operation, '-name', source_name, '-source', nuget_index_url, '-username', 'aws', '-password', self.auth_token ] class DotNetLogin(NuGetBaseLogin): def _get_tool_name(self): return 'dotnet' def _get_list_command(self): return ['dotnet', 'nuget', 'list', 'source', '--format', 'detailed'] def _get_configure_command(self, operation, nuget_index_url, source_name): command = ['dotnet', 'nuget', operation, 'source'] if operation == 'add': command.append(nuget_index_url) command += ['--name', source_name] else: command.append(source_name) command += ['--source', nuget_index_url] command += [ '--username', 'aws', '--password', self.auth_token ] # Encryption is not supported on non-Windows platforms. if not is_windows: command.append('--store-password-in-clear-text') return command class NpmLogin(BaseLogin): # On Windows we need to be explicit about the .cmd file to execute # (unless we execute through the shell, i.e. with shell=True). NPM_CMD = 'npm.cmd' if platform.system().lower() == 'windows' else 'npm' def login(self, dry_run=False): scope = self.get_scope( self.namespace ) commands = self.get_commands( self.repository_endpoint, self.auth_token, scope=scope ) self._run_commands('npm', commands, dry_run) @classmethod def get_scope(cls, namespace): # Regex for valid scope name valid_scope_name = re.compile('^(@[a-z0-9-~][a-z0-9-._~]*)') if namespace is None: return namespace # Add @ prefix to scope if it doesn't exist if namespace.startswith('@'): scope = namespace else: scope = '@{}'.format(namespace) if not valid_scope_name.match(scope): raise ValueError( 'Invalid scope name, scope must contain URL-safe ' 'characters, no leading dots or underscores' ) return scope @classmethod def get_commands(cls, endpoint, auth_token, **kwargs): commands = [] scope = kwargs.get('scope') # prepend scope if it exists registry = '{}:registry'.format(scope) if scope else 'registry' # set up the codeartifact repository as the npm registry. commands.append( [cls.NPM_CMD, 'config', 'set', registry, endpoint] ) repo_uri = urlparse.urlsplit(endpoint) # configure npm to always require auth for the repository. always_auth_config = '//{}{}:always-auth'.format( repo_uri.netloc, repo_uri.path ) commands.append( [cls.NPM_CMD, 'config', 'set', always_auth_config, 'true'] ) # set auth info for the repository. auth_token_config = '//{}{}:_authToken'.format( repo_uri.netloc, repo_uri.path ) commands.append( [cls.NPM_CMD, 'config', 'set', auth_token_config, auth_token] ) return commands class PipLogin(BaseLogin): PIP_INDEX_URL_FMT = '{scheme}://aws:{auth_token}@{netloc}{path}simple/' def login(self, dry_run=False): commands = self.get_commands( self.repository_endpoint, self.auth_token ) self._run_commands('pip', commands, dry_run) @classmethod def get_commands(cls, endpoint, auth_token, **kwargs): repo_uri = urlparse.urlsplit(endpoint) pip_index_url = cls.PIP_INDEX_URL_FMT.format( scheme=repo_uri.scheme, auth_token=auth_token, netloc=repo_uri.netloc, path=repo_uri.path ) return [['pip', 'config', 'set', 'global.index-url', pip_index_url]] class TwineLogin(BaseLogin): DEFAULT_PYPI_RC_FMT = u'''\ [distutils] index-servers= pypi codeartifact [codeartifact] repository: {repository_endpoint} username: aws password: {auth_token}''' def __init__( self, auth_token, expiration, repository_endpoint, domain, repository, subprocess_utils, pypi_rc_path=None ): if pypi_rc_path is None: pypi_rc_path = self.get_pypi_rc_path() self.pypi_rc_path = pypi_rc_path super(TwineLogin, self).__init__( auth_token, expiration, repository_endpoint, domain, repository, subprocess_utils) @classmethod def get_commands(cls, endpoint, auth_token, **kwargs): # TODO(ujjwalpa@): We don't really have a command to execute for Twine # as we directly write to the pypirc file (or to stdout for dryrun) # with python itself instead. Nevertheless, we're using this method for # testing so we'll keep the interface for now but return a string with # the expected pypirc content instead of a list of commands to # execute. This definitely reeks of code smell and there is probably # room for rethinking and refactoring the interfaces of these adapter # helper classes in the future. assert 'pypi_rc_path' in kwargs, 'pypi_rc_path must be provided.' pypi_rc_path = kwargs['pypi_rc_path'] default_pypi_rc = cls.DEFAULT_PYPI_RC_FMT.format( repository_endpoint=endpoint, auth_token=auth_token ) pypi_rc = RawConfigParser() if os.path.exists(pypi_rc_path): try: pypi_rc.read(pypi_rc_path) index_servers = pypi_rc.get('distutils', 'index-servers') servers = [ server.strip() for server in index_servers.split('\n') if server.strip() != '' ] if 'codeartifact' not in servers: servers.append('codeartifact') pypi_rc.set( 'distutils', 'index-servers', '\n' + '\n'.join(servers) ) if 'codeartifact' not in pypi_rc.sections(): pypi_rc.add_section('codeartifact') pypi_rc.set('codeartifact', 'repository', endpoint) pypi_rc.set('codeartifact', 'username', 'aws') pypi_rc.set('codeartifact', 'password', auth_token) except Exception as e: # invalid .pypirc file sys.stdout.write('%s is in an invalid state.' % pypi_rc_path) sys.stdout.write(os.linesep) raise e else: pypi_rc.readfp(StringIO(default_pypi_rc)) pypi_rc_stream = StringIO() pypi_rc.write(pypi_rc_stream) pypi_rc_str = pypi_rc_stream.getvalue() pypi_rc_stream.close() return pypi_rc_str def login(self, dry_run=False): # No command to execute for Twine, we get the expected pypirc content # instead. pypi_rc_str = self.get_commands( self.repository_endpoint, self.auth_token, pypi_rc_path=self.pypi_rc_path ) if dry_run: sys.stdout.write('Dryrun mode is enabled, not writing to pypirc.') sys.stdout.write(os.linesep) sys.stdout.write( '%s would have been set to the following:' % self.pypi_rc_path ) sys.stdout.write(os.linesep) sys.stdout.write(os.linesep) sys.stdout.write(pypi_rc_str) sys.stdout.write(os.linesep) else: with open(self.pypi_rc_path, 'w+') as fp: fp.write(pypi_rc_str) self._write_success_message('twine') @classmethod def get_pypi_rc_path(cls): return os.path.join(os.path.expanduser("~"), ".pypirc") class CodeArtifactLogin(BasicCommand): '''Log in to the idiomatic tool for the requested package format.''' TOOL_MAP = { 'nuget': { 'package_format': 'nuget', 'login_cls': NuGetLogin, 'namespace_support': False, }, 'dotnet': { 'package_format': 'nuget', 'login_cls': DotNetLogin, 'namespace_support': False, }, 'npm': { 'package_format': 'npm', 'login_cls': NpmLogin, 'namespace_support': True, }, 'pip': { 'package_format': 'pypi', 'login_cls': PipLogin, 'namespace_support': False, }, 'twine': { 'package_format': 'pypi', 'login_cls': TwineLogin, 'namespace_support': False, } } NAME = 'login' DESCRIPTION = ( 'Sets up the idiomatic tool for your package format to use your ' 'CodeArtifact repository. Your login information is valid for up ' 'to 12 hours after which you must login again.' ) ARG_TABLE = [ { 'name': 'tool', 'help_text': 'The tool you want to connect with your repository', 'choices': list(TOOL_MAP.keys()), 'required': True, }, { 'name': 'domain', 'help_text': 'Your CodeArtifact domain name', 'required': True, }, { 'name': 'domain-owner', 'help_text': 'The AWS account ID that owns your CodeArtifact ' 'domain', 'required': False, }, { 'name': 'namespace', 'help_text': 'Associates a namespace with your repository tool', 'required': False, }, { 'name': 'duration-seconds', 'cli_type_name': 'integer', 'help_text': 'The time, in seconds, that the login information ' 'is valid', 'required': False, }, { 'name': 'repository', 'help_text': 'Your CodeArtifact repository name', 'required': True, }, { 'name': 'dry-run', 'action': 'store_true', 'help_text': 'Only print the commands that would be executed ' 'to connect your tool with your repository without ' 'making any changes to your configuration', 'required': False, 'default': False }, ] def _get_namespace(self, tool, parsed_args): namespace_compatible = self.TOOL_MAP[tool]['namespace_support'] if not namespace_compatible and parsed_args.namespace: raise ValueError( 'Argument --namespace is not supported for {}'.format(tool) ) else: return parsed_args.namespace def _get_repository_endpoint( self, codeartifact_client, parsed_args, package_format ): kwargs = { 'domain': parsed_args.domain, 'repository': parsed_args.repository, 'format': package_format } if parsed_args.domain_owner: kwargs['domainOwner'] = parsed_args.domain_owner get_repository_endpoint_response = \ codeartifact_client.get_repository_endpoint(**kwargs) return get_repository_endpoint_response['repositoryEndpoint'] def _get_authorization_token(self, codeartifact_client, parsed_args): kwargs = { 'domain': parsed_args.domain } if parsed_args.domain_owner: kwargs['domainOwner'] = parsed_args.domain_owner if parsed_args.duration_seconds: kwargs['durationSeconds'] = parsed_args.duration_seconds get_authorization_token_response = \ codeartifact_client.get_authorization_token(**kwargs) return get_authorization_token_response def _run_main(self, parsed_args, parsed_globals): tool = parsed_args.tool.lower() package_format = self.TOOL_MAP[tool]['package_format'] codeartifact_client = cli_utils.create_client_from_parsed_globals( self._session, 'codeartifact', parsed_globals ) auth_token_res = self._get_authorization_token( codeartifact_client, parsed_args ) repository_endpoint = self._get_repository_endpoint( codeartifact_client, parsed_args, package_format ) domain = parsed_args.domain repository = parsed_args.repository namespace = self._get_namespace(tool, parsed_args) auth_token = auth_token_res['authorizationToken'] expiration = parse_timestamp(auth_token_res['expiration']) login = self.TOOL_MAP[tool]['login_cls']( auth_token, expiration, repository_endpoint, domain, repository, subprocess, namespace ) login.login(parsed_args.dry_run) return 0