File: //usr/lib/python3.9/site-packages/dnf-plugins/osmsplugin.py
# Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://opensource.org/licenses/UPL.
import copy
import errno
import json
import logging
import os
import sys
import dnf
from dnf.conf.config import PRIO_PLUGINCONFIG
import dnf.exceptions
from dnfpluginscore import _, logger
import librepo
sys.path.insert(0, '/usr/share/oracle-cloud-agent/plugins/osms')
from osms import actions
from osms import config
from osms.i18n import ustr
from osms.server import get_cacert, get_proxy, OsmsServer
STORED_CHANNELS_NAME = '_osms.json'
RHN_DISABLED = _("OSMS based repositories will be disabled.")
COMMUNICATION_ERROR = _("There was an error communicating with OSMS server.")
NOT_REGISTERED_ERROR = _("This system is not registered with OSMS server.")
UPDATES_FROM_OSMS = _("This system is receiving updates from OSMS server.")
GPG_KEY_REJECTED = _(
"For security reasons packages from OSMS based repositories can be "
"verified only with locally installed gpg keys. GPG key '%s' has been "
"rejected."
)
PROFILE_NOT_SENT = _("Package profile information could not be sent.")
MISSING_HEADER = _("Missing required login information for OSMS: %s")
MUST_BE_ROOT = _('OSMS plugin has to be run under with the root privileges.')
class OsmsPlugin(dnf.Plugin):
name = 'osmsplugin'
def __init__(self, base, cli):
super(OsmsPlugin, self).__init__(base, cli)
self.base = base
self.cli = cli
self.stored_channels_path = os.path.join(self.base.conf.persistdir, STORED_CHANNELS_NAME)
self.connected_to_osms = False
self.up2date_cfg = {}
self.conf = copy.copy(self.base.conf)
self.parser = self.read_config(self.conf)
if "main" in self.parser.sections():
options = self.parser.items("main")
for (key, value) in options:
self.conf._set_value(key, value, PRIO_PLUGINCONFIG)
if not dnf.util.am_i_root():
logger.warning(MUST_BE_ROOT)
self.conf.enabled = False
if not self.conf.enabled:
return
if self.conf.debuglevel == 2:
log_level = 'INFO'
elif self.conf.debuglevel > 2:
log_level = 'DEBUG'
elif self.conf.debuglevel < 2:
log_level = 'WARNING'
init_root_logger(log_level)
logger.debug('initialized OSMS plugin')
self.activate_channels()
def config(self):
if not self.conf.enabled:
return
if self.cli:
self.cli.demands.root_user = True
def activate_channels(self, networking=True):
enabled_channels = {}
sslcacert = None
force_http = 0
proxy_url = None
proxy_username = None
proxy_password = None
login_info = None
cached_channels = self._read_channels_file()
if not networking:
# no network communication, use list of channels from persistdir
enabled_channels = cached_channels
else:
self.up2date_cfg = config.initUp2dateConfig()
proxy_url, proxy_username, proxy_password = get_proxy(self.up2date_cfg, external=True)
sslcacert = get_cacert(self.up2date_cfg)
force_http = self.up2date_cfg['useNoSSLForPackages']
try:
login_info = OsmsServer(timeout=self.conf.timeout).login()
except Exception as e:
logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e)
return
system_id = config.getSystemId()
if not login_info or not system_id:
logger.error("%s\n%s", NOT_REGISTERED_ERROR, RHN_DISABLED)
self._write_channels_file({})
return
try:
channels = OsmsServer(timeout=self.conf.timeout).up2date.listChannels(system_id)
except Exception as e:
logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e)
return
self.connected_to_osms = True
logger.info(UPDATES_FROM_OSMS)
for channel in channels:
if channel['last_modified']:
enabled_channels[channel['label']] = dict(channel.items())
self._write_channels_file(enabled_channels)
repos = self.base.repos
for channel_id, channel_dict in enabled_channels.items():
cached_channel = cached_channels.get(channel_id)
cached_version = None
if cached_channel:
cached_version = cached_channel.get('last_modified')
conf = copy.copy(self.conf)
if channel_id in self.parser.sections():
options = self.parser.items(channel_id)
for (key, value) in options:
conf._set_value(key, value, PRIO_PLUGINCONFIG)
opts = {
'conf': self.base.conf,
'proxy': proxy_url,
'proxy_username': proxy_username,
'proxy_password': proxy_password,
'timeout': conf.timeout,
'sslcacert': sslcacert,
'force_http': force_http,
'cached_version': cached_version,
'login_info': login_info,
'gpgcheck': conf.gpgcheck,
'enabled': conf.enabled,
}
repo = OsmsRepo(channel_dict, opts)
repos.add(repo)
logger.debug(enabled_channels)
def transaction(self):
""" Update system's profile after transaction. """
if not self.conf.enabled:
return
if not self.connected_to_osms:
# not connected so nothing to do here
return
try:
actions.package_refresh_list(timeout=self.conf.timeout)
except Exception as e:
logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, PROFILE_NOT_SENT, e)
def _read_channels_file(self):
try:
with open(self.stored_channels_path, "r") as channels_file:
content = channels_file.read()
channels = json.loads(content)
return channels
except (FileNotFoundError, IOError) as e:
if e.errno != errno.ENOENT:
raise
except json.decoder.JSONDecodeError:
pass # ignore broken json and recreate it later
return {}
def _write_channels_file(self, var):
try:
with open(self.stored_channels_path, "w") as channels_file:
json.dump(var, channels_file, indent=4)
except (FileNotFoundError, IOError) as e:
if e.errno != errno.ENOENT:
raise
class OsmsRepo(dnf.repo.Repo):
needed_headers = [
'X-RHN-Server-Id',
'X-RHN-Auth-User-Id',
'X-RHN-Auth',
'X-RHN-Auth-Server-Time',
'X-RHN-Auth-Expire-Offset',
]
def __init__(self, channel, opts):
super(OsmsRepo, self).__init__(ustr(channel['label']), opts.get('conf'))
self.name = ustr(channel['name'])
self.baseurl = [url + '/GET-REQ/' + self.id for url in config.getServerlURL()]
self.sslcacert = opts.get('sslcacert')
self.proxy = opts.get('proxy')
self.proxy_username = opts.get('proxy_username')
self.proxy_password = opts.get('proxy_password')
try:
self.gpgkey = get_gpg_key_urls(channel['gpg_key_url'])
except InvalidGpgKeyLocation as e:
logger.warning(GPG_KEY_REJECTED, dnf.i18n.ucd(e))
self.gpgkey = []
if channel['last_modified'] != opts.get('cached_version'):
self.metadata_expire = 1
self.login_info = opts.get('login_info')
self.keepalive = 0
self.bandwidth = 0
self.retries = 1
self.throttle = 0
self.timeout = opts.get('timeout')
self.gpgcheck = opts.get('gpgcheck')
self.force_http = opts.get('force_http')
if self.id.startswith('ocid'):
logger.debug("Set module_hotfixes = True for custom repository %s", self.id)
self.module_hotfixes = True
if opts.get('enabled'):
self.enable()
else:
self.disable()
if hasattr(self, 'set_http_headers'):
# dnf > 4.0.9 on RHEL 8, Fedora 29/30
http_headers = self.create_http_headers()
if http_headers:
self.set_http_headers(http_headers)
def create_http_headers(self):
http_headers = []
for header in self.needed_headers:
if header not in self.login_info:
error = MISSING_HEADER % header
raise dnf.Error.RepoError(error)
if self.login_info[header] in (None, ''):
# This doesn't work due to bug in librepo (or even deeper in libcurl)
# the workaround bellow can be removed once BZ#1211662 is fixed
# http_headers.append("%s;" % header)
http_headers.append("%s: \r\nX-libcurl-Empty-Header-Workaround: *" % header)
else:
http_headers.append("%s: %s" % (header, self.login_info[header]))
up2date_cfg = config.initUp2dateConfig()
if up2date_cfg['tenantId']:
http_headers.append('X-Tenant-Id: %s' % up2date_cfg['tenantId'])
if not self.force_http:
http_headers.append("X-RHN-Transport-Capability: follow-redirects=3")
# libdnf will set Content-Length = '' by default. An empty string is an
# invalid value for Content-Length header. Explicitly set Content-Length to 0.
http_headers.append('Content-Length: 0')
return http_headers
def _handle_new_remote(self, destdir, mirror_setup=True):
# this function is called only on dnf < 3.6.0 (up to Fedora 29)
handle = super(OsmsRepo, self)._handle_new_remote(destdir, mirror_setup)
http_headers = self.create_http_headers()
if http_headers:
handle.setopt(librepo.LRO_HTTPHEADER, http_headers)
return handle
def get_gpg_key_urls(key_url_string):
"""
Parse the key urls and validate them.
key_url_string is a space seperated list of gpg key urls that must be
located in /etc/pkg/rpm-gpg/.
Return a list of strings containing the key urls.
Raises InvalidGpgKeyLocation if any of the key urls are invalid.
"""
key_urls = key_url_string.split()
for key_url in key_urls:
if not is_valid_gpg_key_url(key_url):
raise InvalidGpgKeyLocation(key_url)
return key_urls
class InvalidGpgKeyLocation(Exception):
pass
def is_valid_gpg_key_url(key_url):
proto_split = key_url.split('://')
if len(proto_split) != 2:
return False
proto, path = proto_split
if proto.lower() != 'file':
return False
path = os.path.normpath(path)
if not path.startswith('/etc/pki/rpm-gpg/'):
return False
return True
def init_root_logger(log_level):
# DNF doesn't setup root logger. Logs to root logger go to the console by
# default, which interferes with dnf cli. Disable them before
# DNF introduces a solution.
root_logger = logging.getLogger()
if root_logger.hasHandlers():
return
root_logger.propagate = 0
root_logger.addHandler(logging.NullHandler())
root_logger.setLevel(log_level)