HEX
Server: Apache
System: Linux sh-pro142.hostgator.com.br 5.14.0-162.23.1.9991722448259.nf.el9.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Jul 31 18:11:45 UTC 2024 x86_64
User: okform09 (1324)
PHP: 8.3.30
Disabled: NONE
Upload Files
File: //usr/lib/python3.9/site-packages/dnf-plugins/osmsplugin.py
#   Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
#   Licensed under the Universal Permissive License v 1.0 as shown at https://opensource.org/licenses/UPL.

import copy
import errno
import json
import logging
import os
import sys

import dnf
from dnf.conf.config import PRIO_PLUGINCONFIG
import dnf.exceptions
from dnfpluginscore import _, logger
import librepo

sys.path.insert(0, '/usr/share/oracle-cloud-agent/plugins/osms')

from osms import actions
from osms import config
from osms.i18n import ustr
from osms.server import get_cacert, get_proxy, OsmsServer


STORED_CHANNELS_NAME = '_osms.json'
RHN_DISABLED = _("OSMS based repositories will be disabled.")
COMMUNICATION_ERROR = _("There was an error communicating with OSMS server.")
NOT_REGISTERED_ERROR = _("This system is not registered with OSMS server.")
UPDATES_FROM_OSMS = _("This system is receiving updates from OSMS server.")
GPG_KEY_REJECTED = _(
    "For security reasons packages from OSMS based repositories can be "
    "verified only with locally installed gpg keys. GPG key '%s' has been "
    "rejected."
)
PROFILE_NOT_SENT = _("Package profile information could not be sent.")
MISSING_HEADER = _("Missing required login information for OSMS: %s")
MUST_BE_ROOT = _('OSMS plugin has to be run under with the root privileges.')


class OsmsPlugin(dnf.Plugin):

    name = 'osmsplugin'

    def __init__(self, base, cli):
        super(OsmsPlugin, self).__init__(base, cli)
        self.base = base
        self.cli = cli
        self.stored_channels_path = os.path.join(self.base.conf.persistdir, STORED_CHANNELS_NAME)
        self.connected_to_osms = False
        self.up2date_cfg = {}
        self.conf = copy.copy(self.base.conf)
        self.parser = self.read_config(self.conf)
        if "main" in self.parser.sections():
            options = self.parser.items("main")
            for (key, value) in options:
                self.conf._set_value(key, value, PRIO_PLUGINCONFIG)
        if not dnf.util.am_i_root():
            logger.warning(MUST_BE_ROOT)
            self.conf.enabled = False
        if not self.conf.enabled:
            return
        if self.conf.debuglevel == 2:
            log_level = 'INFO'
        elif self.conf.debuglevel > 2:
            log_level = 'DEBUG'
        elif self.conf.debuglevel < 2:
            log_level = 'WARNING'
        init_root_logger(log_level)
        logger.debug('initialized OSMS plugin')
        self.activate_channels()

    def config(self):
        if not self.conf.enabled:
            return
        if self.cli:
            self.cli.demands.root_user = True

    def activate_channels(self, networking=True):
        enabled_channels = {}
        sslcacert = None
        force_http = 0
        proxy_url = None
        proxy_username = None
        proxy_password = None
        login_info = None
        cached_channels = self._read_channels_file()
        if not networking:
            # no network communication, use list of channels from persistdir
            enabled_channels = cached_channels
        else:
            self.up2date_cfg = config.initUp2dateConfig()
            proxy_url, proxy_username, proxy_password = get_proxy(self.up2date_cfg, external=True)
            sslcacert = get_cacert(self.up2date_cfg)
            force_http = self.up2date_cfg['useNoSSLForPackages']

            try:
                login_info = OsmsServer(timeout=self.conf.timeout).login()
            except Exception as e:
                logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e)
                return

            system_id = config.getSystemId()
            if not login_info or not system_id:
                logger.error("%s\n%s", NOT_REGISTERED_ERROR, RHN_DISABLED)
                self._write_channels_file({})
                return

            try:
                channels = OsmsServer(timeout=self.conf.timeout).up2date.listChannels(system_id)
            except Exception as e:
                logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e)
                return

            self.connected_to_osms = True
            logger.info(UPDATES_FROM_OSMS)

            for channel in channels:
                if channel['last_modified']:
                    enabled_channels[channel['label']] = dict(channel.items())
            self._write_channels_file(enabled_channels)

        repos = self.base.repos

        for channel_id, channel_dict in enabled_channels.items():
            cached_channel = cached_channels.get(channel_id)
            cached_version = None
            if cached_channel:
                cached_version = cached_channel.get('last_modified')
            conf = copy.copy(self.conf)
            if channel_id in self.parser.sections():
                options = self.parser.items(channel_id)
                for (key, value) in options:
                    conf._set_value(key, value, PRIO_PLUGINCONFIG)
            opts = {
                'conf': self.base.conf,
                'proxy': proxy_url,
                'proxy_username': proxy_username,
                'proxy_password': proxy_password,
                'timeout': conf.timeout,
                'sslcacert': sslcacert,
                'force_http': force_http,
                'cached_version': cached_version,
                'login_info': login_info,
                'gpgcheck': conf.gpgcheck,
                'enabled': conf.enabled,
            }
            repo = OsmsRepo(channel_dict, opts)
            repos.add(repo)

        logger.debug(enabled_channels)

    def transaction(self):
        """ Update system's profile after transaction. """
        if not self.conf.enabled:
            return
        if not self.connected_to_osms:
            # not connected so nothing to do here
            return
        try:
            actions.package_refresh_list(timeout=self.conf.timeout)
        except Exception as e:
            logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, PROFILE_NOT_SENT, e)

    def _read_channels_file(self):
        try:
            with open(self.stored_channels_path, "r") as channels_file:
                content = channels_file.read()
                channels = json.loads(content)
                return channels
        except (FileNotFoundError, IOError) as e:
            if e.errno != errno.ENOENT:
                raise
        except json.decoder.JSONDecodeError:
            pass  # ignore broken json and recreate it later

        return {}

    def _write_channels_file(self, var):
        try:
            with open(self.stored_channels_path, "w") as channels_file:
                json.dump(var, channels_file, indent=4)
        except (FileNotFoundError, IOError) as e:
            if e.errno != errno.ENOENT:
                raise


class OsmsRepo(dnf.repo.Repo):

    needed_headers = [
        'X-RHN-Server-Id',
        'X-RHN-Auth-User-Id',
        'X-RHN-Auth',
        'X-RHN-Auth-Server-Time',
        'X-RHN-Auth-Expire-Offset',
    ]

    def __init__(self, channel, opts):
        super(OsmsRepo, self).__init__(ustr(channel['label']), opts.get('conf'))
        self.name = ustr(channel['name'])
        self.baseurl = [url + '/GET-REQ/' + self.id for url in config.getServerlURL()]
        self.sslcacert = opts.get('sslcacert')
        self.proxy = opts.get('proxy')
        self.proxy_username = opts.get('proxy_username')
        self.proxy_password = opts.get('proxy_password')
        try:
            self.gpgkey = get_gpg_key_urls(channel['gpg_key_url'])
        except InvalidGpgKeyLocation as e:
            logger.warning(GPG_KEY_REJECTED, dnf.i18n.ucd(e))
            self.gpgkey = []
        if channel['last_modified'] != opts.get('cached_version'):
            self.metadata_expire = 1

        self.login_info = opts.get('login_info')
        self.keepalive = 0
        self.bandwidth = 0
        self.retries = 1
        self.throttle = 0
        self.timeout = opts.get('timeout')
        self.gpgcheck = opts.get('gpgcheck')
        self.force_http = opts.get('force_http')

        if self.id.startswith('ocid'):
            logger.debug("Set module_hotfixes = True for custom repository %s", self.id)
            self.module_hotfixes = True

        if opts.get('enabled'):
            self.enable()
        else:
            self.disable()

        if hasattr(self, 'set_http_headers'):
            # dnf > 4.0.9  on RHEL 8, Fedora 29/30
            http_headers = self.create_http_headers()
            if http_headers:
                self.set_http_headers(http_headers)

    def create_http_headers(self):
        http_headers = []
        for header in self.needed_headers:
            if header not in self.login_info:
                error = MISSING_HEADER % header
                raise dnf.Error.RepoError(error)
            if self.login_info[header] in (None, ''):
                # This doesn't work due to bug in librepo (or even deeper in libcurl)
                # the workaround bellow can be removed once BZ#1211662 is fixed
                # http_headers.append("%s;" % header)
                http_headers.append("%s: \r\nX-libcurl-Empty-Header-Workaround: *" % header)
            else:
                http_headers.append("%s: %s" % (header, self.login_info[header]))

        up2date_cfg = config.initUp2dateConfig()
        if up2date_cfg['tenantId']:
            http_headers.append('X-Tenant-Id: %s' % up2date_cfg['tenantId'])

        if not self.force_http:
            http_headers.append("X-RHN-Transport-Capability: follow-redirects=3")

        # libdnf will set Content-Length = '' by default. An empty string is an
        # invalid value for Content-Length header. Explicitly set Content-Length to 0.
        http_headers.append('Content-Length: 0')

        return http_headers

    def _handle_new_remote(self, destdir, mirror_setup=True):
        # this function is called only on dnf < 3.6.0 (up to Fedora 29)
        handle = super(OsmsRepo, self)._handle_new_remote(destdir, mirror_setup)
        http_headers = self.create_http_headers()
        if http_headers:
            handle.setopt(librepo.LRO_HTTPHEADER, http_headers)
        return handle


def get_gpg_key_urls(key_url_string):
    """
    Parse the key urls and validate them.

    key_url_string is a space seperated list of gpg key urls that must be
    located in /etc/pkg/rpm-gpg/.
    Return a list of strings containing the key urls.
    Raises InvalidGpgKeyLocation if any of the key urls are invalid.
    """
    key_urls = key_url_string.split()
    for key_url in key_urls:
        if not is_valid_gpg_key_url(key_url):
            raise InvalidGpgKeyLocation(key_url)
    return key_urls


class InvalidGpgKeyLocation(Exception):
    pass


def is_valid_gpg_key_url(key_url):
    proto_split = key_url.split('://')
    if len(proto_split) != 2:
        return False

    proto, path = proto_split
    if proto.lower() != 'file':
        return False

    path = os.path.normpath(path)
    if not path.startswith('/etc/pki/rpm-gpg/'):
        return False
    return True


def init_root_logger(log_level):
    # DNF doesn't setup root logger. Logs to root logger go to the console by
    # default, which interferes with dnf cli. Disable them before
    # DNF introduces a solution.
    root_logger = logging.getLogger()
    if root_logger.hasHandlers():
        return
    root_logger.propagate = 0
    root_logger.addHandler(logging.NullHandler())
    root_logger.setLevel(log_level)