| Server IP : 170.10.162.208 / Your IP : 216.73.216.181 Web Server : LiteSpeed System : Linux altar19.supremepanel19.com 4.18.0-553.69.1.lve.el8.x86_64 #1 SMP Wed Aug 13 19:53:59 UTC 2025 x86_64 User : deltahospital ( 1806) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /tmp/ |
Upload File : |
addr.h 0000644 00000002017 15051120572 0005625 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_ADDR_H_
#define __NETLINK_CLI_ADDR_H_
#include <netlink/route/addr.h>
#ifdef __cplusplus
extern "C" {
#endif
#define nl_cli_addr_alloc_cache(sk) \
nl_cli_alloc_cache((sk), "address", rtnl_addr_alloc_cache)
extern struct rtnl_addr *nl_cli_addr_alloc(void);
extern void nl_cli_addr_parse_family(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_local(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_dev(struct rtnl_addr *, struct nl_cache *,char *);
extern void nl_cli_addr_parse_label(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_peer(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_scope(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_broadcast(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_preferred(struct rtnl_addr *, char *);
extern void nl_cli_addr_parse_valid(struct rtnl_addr *, char *);
#ifdef __cplusplus
}
#endif
#endif
qdisc.h 0000644 00000000712 15051120572 0006016 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2011 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_QDISC_H_
#define __NETLINK_CLI_QDISC_H_
#include <netlink/route/qdisc.h>
#ifdef __cplusplus
extern "C" {
#endif
#define nl_cli_qdisc_alloc_cache(sk) \
nl_cli_alloc_cache((sk), "queueing disciplines", \
rtnl_qdisc_alloc_cache)
extern struct rtnl_qdisc *nl_cli_qdisc_alloc(void);
#ifdef __cplusplus
}
#endif
#endif
tc.h 0000644 00000002265 15051120572 0005326 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2010-2011 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_TC_H_
#define __NETLINK_CLI_TC_H_
#include <netlink/route/tc.h>
#ifdef __cplusplus
extern "C" {
#endif
struct rtnl_tc_ops;
extern void nl_cli_tc_parse_dev(struct rtnl_tc *, struct nl_cache *, char *);
extern void nl_cli_tc_parse_parent(struct rtnl_tc *, char *);
extern void nl_cli_tc_parse_handle(struct rtnl_tc *, char *, int);
extern void nl_cli_tc_parse_mtu(struct rtnl_tc *, char *);
extern void nl_cli_tc_parse_mpu(struct rtnl_tc *, char *);
extern void nl_cli_tc_parse_overhead(struct rtnl_tc *, char *);
extern void nl_cli_tc_parse_linktype(struct rtnl_tc *, char *);
extern void nl_cli_tc_parse_kind(struct rtnl_tc *, char *);
struct nl_cli_tc_module
{
const char * tm_name;
enum rtnl_tc_type tm_type;
struct rtnl_tc_ops * tm_ops;
void (*tm_parse_argv)(struct rtnl_tc *, int, char **);
struct nl_list_head tm_list;
};
extern struct nl_cli_tc_module *nl_cli_tc_lookup(struct rtnl_tc_ops *);
extern void nl_cli_tc_register(struct nl_cli_tc_module *);
extern void nl_cli_tc_unregister(struct nl_cli_tc_module *);
#ifdef __cplusplus
}
#endif
#endif
neigh.h 0000644 00000001500 15051120572 0006001 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_NEIGH_H_
#define __NETLINK_CLI_NEIGH_H_
#include <netlink/route/neighbour.h>
#ifdef __cplusplus
extern "C" {
#endif
#define nl_cli_neigh_alloc_cache(sk) \
nl_cli_alloc_cache_flags((sk), "neighbour", NL_CACHE_AF_ITER, \
rtnl_neigh_alloc_cache_flags)
extern struct rtnl_neigh *nl_cli_neigh_alloc(void);
extern void nl_cli_neigh_parse_dst(struct rtnl_neigh *, char *);
extern void nl_cli_neigh_parse_lladdr(struct rtnl_neigh *, char *);
extern void nl_cli_neigh_parse_dev(struct rtnl_neigh *, struct nl_cache *, char *);
extern void nl_cli_neigh_parse_family(struct rtnl_neigh *, char *);
extern void nl_cli_neigh_parse_state(struct rtnl_neigh *, char *);
#ifdef __cplusplus
}
#endif
#endif
utils.h 0000644 00000004256 15051120572 0006062 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2003-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_UTILS_H_
#define __NETLINK_CLI_UTILS_H_
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdarg.h>
#include <limits.h>
#include <inttypes.h>
#include <errno.h>
#include <stdint.h>
#include <ctype.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <netlink/netlink.h>
#include <netlink/utils.h>
#include <netlink/addr.h>
#include <netlink/list.h>
#include <netlink/route/rtnl.h>
#include <netlink/route/link.h>
#include <netlink/route/addr.h>
#include <netlink/route/neighbour.h>
#include <netlink/route/neightbl.h>
#include <netlink/route/route.h>
#include <netlink/route/rule.h>
#include <netlink/route/qdisc.h>
#include <netlink/route/class.h>
#include <netlink/route/classifier.h>
#include <netlink/route/cls/ematch.h>
#include <netlink/fib_lookup/lookup.h>
#include <netlink/fib_lookup/request.h>
#include <netlink/genl/genl.h>
#include <netlink/genl/ctrl.h>
#include <netlink/genl/mngt.h>
#include <netlink/netfilter/ct.h>
#ifdef __cplusplus
extern "C" {
#endif
#ifndef __init
#define __init __attribute__((constructor))
#endif
#ifndef __exit
#define __exit __attribute__((destructor))
#endif
extern uint32_t nl_cli_parse_u32(const char *);
extern void nl_cli_print_version(void)
__attribute__((noreturn));
extern void nl_cli_fatal(int, const char *, ...)
__attribute__((noreturn));
extern struct nl_addr * nl_cli_addr_parse(const char *, int);
extern int nl_cli_connect(struct nl_sock *, int);
extern struct nl_sock * nl_cli_alloc_socket(void);
extern int nl_cli_parse_dumptype(const char *);
extern int nl_cli_confirm(struct nl_object *,
struct nl_dump_params *, int);
extern struct nl_cache *nl_cli_alloc_cache(struct nl_sock *, const char *,
int (*ac)(struct nl_sock *, struct nl_cache **));
extern struct nl_cache *nl_cli_alloc_cache_flags(struct nl_sock *, const char *,
unsigned int flags,
int (*ac)(struct nl_sock *, struct nl_cache **, unsigned int));
extern void nl_cli_load_module(const char *, const char *);
#ifdef __cplusplus
}
#endif
#endif
link.h 0000644 00000002230 15051120572 0005645 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2010 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_LINK_H_
#define __NETLINK_CLI_LINK_H_
#include <netlink/route/link.h>
#include <netlink/cli/utils.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct rtnl_link *nl_cli_link_alloc(void);
extern struct nl_cache *nl_cli_link_alloc_cache_family(struct nl_sock *, int);
extern struct nl_cache *nl_cli_link_alloc_cache_family_flags(struct nl_sock *, int,
unsigned int);
extern struct nl_cache *nl_cli_link_alloc_cache(struct nl_sock *);
extern struct nl_cache *nl_cli_link_alloc_cache_flags(struct nl_sock *,
unsigned int);
extern void nl_cli_link_parse_family(struct rtnl_link *, char *);
extern void nl_cli_link_parse_name(struct rtnl_link *, char *);
extern void nl_cli_link_parse_mtu(struct rtnl_link *, char *);
extern void nl_cli_link_parse_ifindex(struct rtnl_link *, char *);
extern void nl_cli_link_parse_txqlen(struct rtnl_link *, char *);
extern void nl_cli_link_parse_weight(struct rtnl_link *, char *);
extern void nl_cli_link_parse_ifalias(struct rtnl_link *, char *);
#ifdef __cplusplus
}
#endif
#endif
route.h 0000644 00000002345 15051120572 0006055 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_ROUTE_H_
#define __NETLINK_CLI_ROUTE_H_
#include <netlink/route/route.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct rtnl_route *nl_cli_route_alloc(void);
extern struct nl_cache *nl_cli_route_alloc_cache(struct nl_sock *, int);
extern void nl_cli_route_parse_family(struct rtnl_route *, char *);
extern void nl_cli_route_parse_dst(struct rtnl_route *, char *);
extern void nl_cli_route_parse_src(struct rtnl_route *, char *);
extern void nl_cli_route_parse_pref_src(struct rtnl_route *, char *);
extern void nl_cli_route_parse_metric(struct rtnl_route *, char *);
extern void nl_cli_route_parse_nexthop(struct rtnl_route *, char *, struct nl_cache *);
extern void nl_cli_route_parse_table(struct rtnl_route *, char *);
extern void nl_cli_route_parse_prio(struct rtnl_route *, char *);
extern void nl_cli_route_parse_scope(struct rtnl_route *, char *);
extern void nl_cli_route_parse_protocol(struct rtnl_route *, char *);
extern void nl_cli_route_parse_type(struct rtnl_route *, char *);
extern void nl_cli_route_parse_iif(struct rtnl_route *, char *, struct nl_cache *);
#ifdef __cplusplus
}
#endif
#endif
ct.h 0000644 00000002351 15051120572 0005322 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_CT_H_
#define __NETLINK_CLI_CT_H_
#include <netlink/netfilter/ct.h>
#include <linux/netfilter/nf_conntrack_common.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct nfnl_ct *nl_cli_ct_alloc(void);
extern struct nl_cache *nl_cli_ct_alloc_cache(struct nl_sock *);
extern void nl_cli_ct_parse_family(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_protocol(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_mark(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_timeout(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_id(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_use(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_src(struct nfnl_ct *, int, char *);
extern void nl_cli_ct_parse_dst(struct nfnl_ct *, int, char *);
extern void nl_cli_ct_parse_src_port(struct nfnl_ct *, int, char *);
extern void nl_cli_ct_parse_dst_port(struct nfnl_ct *, int, char *);
extern void nl_cli_ct_parse_tcp_state(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_status(struct nfnl_ct *, char *);
extern void nl_cli_ct_parse_zone(struct nfnl_ct *, char *);
#ifdef __cplusplus
}
#endif
#endif
class.h 0000644 00000000656 15051120572 0006027 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2010 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_CLASS_H_
#define __NETLINK_CLI_CLASS_H_
#include <netlink/route/class.h>
#include <netlink/cli/tc.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct rtnl_class *nl_cli_class_alloc(void);
extern struct nl_cache *nl_cli_class_alloc_cache(struct nl_sock *, int);
#ifdef __cplusplus
}
#endif
#endif
cls.h 0000644 00000001127 15051120572 0005475 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2010 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_CLS_H_
#define __NETLINK_CLI_CLS_H_
#include <netlink/route/classifier.h>
#include <netlink/cli/tc.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct rtnl_cls * nl_cli_cls_alloc(void);
extern struct nl_cache * nl_cli_cls_alloc_cache(struct nl_sock *,
int, uint32_t);
extern void nl_cli_cls_parse_proto(struct rtnl_cls *, char *);
extern struct rtnl_ematch_tree *nl_cli_cls_parse_ematch(struct rtnl_cls *, char *);
#ifdef __cplusplus
}
#endif
#endif
rule.h 0000644 00000000716 15051120572 0005666 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2008-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_RULE_H_
#define __NETLINK_CLI_RULE_H_
#include <netlink/route/rule.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct rtnl_rule *nl_cli_rule_alloc(void);
extern struct nl_cache *nl_cli_rule_alloc_cache(struct nl_sock *);
extern void nl_cli_rule_parse_family(struct rtnl_rule *, char *);
#ifdef __cplusplus
}
#endif
#endif
mdb.h 0000644 00000000406 15051120572 0005455 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
#ifndef __NETLINK_CLI_MDB_H_
#define __NETLINK_CLI_MDB_H_
#include <netlink/route/mdb.h>
#define nl_cli_mdb_alloc_cache(sk) \
nl_cli_alloc_cache_flags((sk), "mdb", NL_CACHE_AF_ITER, rtnl_mdb_alloc_cache)
#endif
exp.h 0000644 00000003150 15051120572 0005506 0 ustar 00 /* SPDX-License-Identifier: LGPL-2.1-only */
/*
* Copyright (c) 2012 Rich Fought <Rich.Fought@watchguard.com>
* Copyright (c) 2008-2009 Thomas Graf <tgraf@suug.ch>
*/
#ifndef __NETLINK_CLI_EXP_H_
#define __NETLINK_CLI_EXP_H_
#include <netlink/netfilter/exp.h>
#include <linux/netfilter/nf_conntrack_common.h>
#ifdef __cplusplus
extern "C" {
#endif
extern struct nfnl_exp *nl_cli_exp_alloc(void);
extern struct nl_cache *nl_cli_exp_alloc_cache(struct nl_sock *);
extern void nl_cli_exp_parse_family(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_timeout(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_id(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_helper_name(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_zone(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_flags(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_class(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_nat_dir(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_fn(struct nfnl_exp *, char *);
extern void nl_cli_exp_parse_src(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_dst(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_l4protonum(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_src_port(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_dst_port(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_icmp_id(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_icmp_type(struct nfnl_exp *, int, char *);
extern void nl_cli_exp_parse_icmp_code(struct nfnl_exp *, int, char *);
#ifdef __cplusplus
}
#endif
#endif
demand.py 0000644 00000004747 15051234445 0006365 0 ustar 00 # demand.py
# Demand sheet and related classes.
#
# Copyright (C) 2014-2015 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import unicode_literals
class _BoolDefault(object):
def __init__(self, default):
self.default = default
self._storing_name = '__%s%x' % (self.__class__.__name__, id(self))
def __get__(self, obj, objtype=None):
objdict = obj.__dict__
if self._storing_name in objdict:
return objdict[self._storing_name]
return self.default
def __set__(self, obj, val):
objdict = obj.__dict__
if self._storing_name in objdict:
current_val = objdict[self._storing_name]
if current_val != val:
raise AttributeError('Demand already set.')
objdict[self._storing_name] = val
class DemandSheet(object):
"""Collection of demands that different CLI parts have on other parts. :api"""
# :api...
allow_erasing = _BoolDefault(False)
available_repos = _BoolDefault(False)
resolving = _BoolDefault(False)
root_user = _BoolDefault(False)
sack_activation = _BoolDefault(False)
load_system_repo = _BoolDefault(True)
success_exit_status = 0
cacheonly = _BoolDefault(False)
fresh_metadata = _BoolDefault(True)
freshest_metadata = _BoolDefault(False)
changelogs = _BoolDefault(False)
transaction_display = None
# This demand controlls applicability of the plugins that could filter
# repositories packages (e.g. versionlock).
# If it stays None, the demands.resolving is used as a fallback.
plugin_filtering_enabled = _BoolDefault(None)
aliases.py 0000644 00000015735 15051234446 0006556 0 ustar 00 # aliases.py
# Resolving aliases in CLI arguments.
#
# Copyright (C) 2018 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import absolute_import
from __future__ import unicode_literals
from dnf.i18n import _
import collections
import dnf.cli
from dnf.conf.config import PRIO_DEFAULT
import dnf.exceptions
import libdnf.conf
import logging
import os
import os.path
logger = logging.getLogger('dnf')
ALIASES_DROPIN_DIR = '/etc/dnf/aliases.d/'
ALIASES_CONF_PATH = os.path.join(ALIASES_DROPIN_DIR, 'ALIASES.conf')
ALIASES_USER_PATH = os.path.join(ALIASES_DROPIN_DIR, 'USER.conf')
class AliasesConfig(object):
def __init__(self, path):
self._path = path
self._parser = libdnf.conf.ConfigParser()
self._parser.read(self._path)
@property
def enabled(self):
option = libdnf.conf.OptionBool(True)
try:
option.set(PRIO_DEFAULT, self._parser.getData()["main"]["enabled"])
except IndexError:
pass
return option.getValue()
@property
def aliases(self):
result = collections.OrderedDict()
section = "aliases"
if not self._parser.hasSection(section):
return result
for key in self._parser.options(section):
value = self._parser.getValue(section, key)
if not value:
continue
result[key] = value.split()
return result
class Aliases(object):
def __init__(self):
self.aliases = collections.OrderedDict()
self.conf = None
self.enabled = True
if self._disabled_by_environ():
self.enabled = False
return
self._load_main()
if not self.enabled:
return
self._load_aliases()
def _disabled_by_environ(self):
option = libdnf.conf.OptionBool(True)
try:
option.set(PRIO_DEFAULT, os.environ['DNF_DISABLE_ALIASES'])
return option.getValue()
except KeyError:
return False
except RuntimeError:
logger.warning(
_('Unexpected value of environment variable: '
'DNF_DISABLE_ALIASES=%s'), os.environ['DNF_DISABLE_ALIASES'])
return True
def _load_conf(self, path):
try:
return AliasesConfig(path)
except RuntimeError as e:
raise dnf.exceptions.ConfigError(
_('Parsing file "%s" failed: %s') % (path, e))
except IOError as e:
raise dnf.exceptions.ConfigError(
_('Cannot read file "%s": %s') % (path, e))
def _load_main(self):
try:
self.conf = self._load_conf(ALIASES_CONF_PATH)
self.enabled = self.conf.enabled
except dnf.exceptions.ConfigError as e:
logger.debug(_('Config error: %s'), e)
def _load_aliases(self, filenames=None):
if filenames is None:
try:
filenames = self._dropin_dir_filenames()
except dnf.exceptions.ConfigError:
return
for filename in filenames:
try:
conf = self._load_conf(filename)
if conf.enabled:
self.aliases.update(conf.aliases)
except dnf.exceptions.ConfigError as e:
logger.warning(_('Config error: %s'), e)
def _dropin_dir_filenames(self):
# Get default aliases config filenames:
# all files from ALIASES_DROPIN_DIR,
# and ALIASES_USER_PATH as the last one (-> override all others)
ignored_filenames = [os.path.basename(ALIASES_CONF_PATH),
os.path.basename(ALIASES_USER_PATH)]
def _ignore_filename(filename):
return filename in ignored_filenames or\
filename.startswith('.') or\
not filename.endswith(('.conf', '.CONF'))
filenames = []
try:
if not os.path.exists(ALIASES_DROPIN_DIR):
os.mkdir(ALIASES_DROPIN_DIR)
for fn in sorted(os.listdir(ALIASES_DROPIN_DIR)):
if _ignore_filename(fn):
continue
filenames.append(os.path.join(ALIASES_DROPIN_DIR, fn))
except (IOError, OSError) as e:
raise dnf.exceptions.ConfigError(e)
if os.path.exists(ALIASES_USER_PATH):
filenames.append(ALIASES_USER_PATH)
return filenames
def _resolve(self, args):
stack = []
self.prefix_options = []
def store_prefix(args):
num = 0
for arg in args:
if arg and arg[0] != '-':
break
num += 1
self.prefix_options += args[:num]
return args[num:]
def subresolve(args):
suffix = store_prefix(args)
if (not suffix or # Current alias on stack is resolved
suffix[0] not in self.aliases or # End resolving
suffix[0].startswith('\\')): # End resolving
try:
stack.pop()
# strip the '\' if it exists
if suffix[0].startswith('\\'):
suffix[0] = suffix[0][1:]
except IndexError:
pass
return suffix
if suffix[0] in stack: # Infinite recursion detected
raise dnf.exceptions.Error(
_('Aliases contain infinite recursion'))
# Next word must be an alias
stack.append(suffix[0])
current_alias_result = subresolve(self.aliases[suffix[0]])
if current_alias_result: # We reached non-alias or '\'
return current_alias_result + suffix[1:]
else: # Need to resolve aliases in the rest
return subresolve(suffix[1:])
suffix = subresolve(args)
return self.prefix_options + suffix
def resolve(self, args):
if self.enabled:
try:
args = self._resolve(args)
except dnf.exceptions.Error as e:
logger.error(_('%s, using original arguments.'), e)
return args
utils.py 0000644 00000010650 15051234446 0006264 0 ustar 00 # Copyright (C) 2016 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Various utility functions, and a utility class."""
from __future__ import absolute_import
from __future__ import unicode_literals
from dnf.cli.format import format_number
from dnf.i18n import _
import dnf.util
import logging
import os
import time
_USER_HZ = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
logger = logging.getLogger('dnf')
def jiffies_to_seconds(jiffies):
"""Convert a number of jiffies to seconds. How many jiffies are in a second
is system-dependent, e.g. 100 jiffies = 1 second is common.
:param jiffies: a number of jiffies
:return: the equivalent number of seconds
"""
return int(jiffies) / _USER_HZ
def seconds_to_ui_time(seconds):
"""Return a human-readable string representation of the length of
a time interval given in seconds.
:param seconds: the length of the time interval in seconds
:return: a human-readable string representation of the length of
the time interval
"""
if seconds >= 60 * 60 * 24:
return "%d day(s) %d:%02d:%02d" % (seconds // (60 * 60 * 24),
(seconds // (60 * 60)) % 24,
(seconds // 60) % 60,
seconds % 60)
if seconds >= 60 * 60:
return "%d:%02d:%02d" % (seconds // (60 * 60), (seconds // 60) % 60,
(seconds % 60))
return "%02d:%02d" % ((seconds // 60), seconds % 60)
def get_process_info(pid):
"""Return info dict about a process."""
pid = int(pid)
# Maybe true if /proc isn't mounted, or not Linux ... or something.
if (not os.path.exists("/proc/%d/status" % pid) or
not os.path.exists("/proc/stat") or
not os.path.exists("/proc/%d/stat" % pid)):
return
ps = {}
with open("/proc/%d/status" % pid) as status_file:
for line in status_file:
if line[-1] != '\n':
continue
data = line[:-1].split(':\t', 1)
if len(data) < 2:
continue
data[1] = dnf.util.rtrim(data[1], ' kB')
ps[data[0].strip().lower()] = data[1].strip()
if 'vmrss' not in ps:
return
if 'vmsize' not in ps:
return
boot_time = None
with open("/proc/stat") as stat_file:
for line in stat_file:
if line.startswith("btime "):
boot_time = int(line[len("btime "):-1])
break
if boot_time is None:
return
with open('/proc/%d/stat' % pid) as stat_file:
ps_stat = stat_file.read().split()
ps['start_time'] = boot_time + jiffies_to_seconds(ps_stat[21])
ps['state'] = {'R' : _('Running'),
'S' : _('Sleeping'),
'D' : _('Uninterruptible'),
'Z' : _('Zombie'),
'T' : _('Traced/Stopped')
}.get(ps_stat[2], _('Unknown'))
return ps
def show_lock_owner(pid):
"""Output information about process holding a lock."""
ps = get_process_info(pid)
if not ps:
msg = _('Unable to find information about the locking process (PID %d)')
logger.critical(msg, pid)
return
msg = _(' The application with PID %d is: %s') % (pid, ps['name'])
logger.critical("%s", msg)
logger.critical(_(" Memory : %5s RSS (%5sB VSZ)"),
format_number(int(ps['vmrss']) * 1024),
format_number(int(ps['vmsize']) * 1024))
ago = seconds_to_ui_time(int(time.time()) - ps['start_time'])
logger.critical(_(' Started: %s - %s ago'),
dnf.util.normalize_time(ps['start_time']), ago)
logger.critical(_(' State : %s'), ps['state'])
return
completion_helper.py 0000644 00000016457 15051234446 0010647 0 ustar 00 #!/usr/libexec/platform-python
#
# This file is part of dnf.
#
# Copyright 2015 (C) Igor Gnatenko <i.gnatenko.brain@gmail.com>
# Copyright 2016 (C) Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
import dnf.exceptions
import dnf.cli
import dnf.cli.commands.clean
import sys
def filter_list_by_kw(kw, lst):
return filter(lambda k: str(k).startswith(kw), lst)
def listpkg_to_setstr(pkgs):
return set([str(x) for x in pkgs])
class RemoveCompletionCommand(dnf.cli.commands.remove.RemoveCommand):
def __init__(self, args):
super(RemoveCompletionCommand, self).__init__(args)
def configure(self):
self.cli.demands.root_user = False
self.cli.demands.sack_activation = True
def run(self):
for pkg in ListCompletionCommand.installed(self.base, self.opts.pkg_specs):
print(str(pkg))
class InstallCompletionCommand(dnf.cli.commands.install.InstallCommand):
def __init__(self, args):
super(InstallCompletionCommand, self).__init__(args)
def configure(self):
self.cli.demands.root_user = False
self.cli.demands.available_repos = True
self.cli.demands.sack_activation = True
def run(self):
installed = listpkg_to_setstr(ListCompletionCommand.installed(self.base,
self.opts.pkg_specs))
available = listpkg_to_setstr(ListCompletionCommand.available(self.base,
self.opts.pkg_specs))
for pkg in (available - installed):
print(str(pkg))
class ReinstallCompletionCommand(dnf.cli.commands.reinstall.ReinstallCommand):
def __init__(self, args):
super(ReinstallCompletionCommand, self).__init__(args)
def configure(self):
self.cli.demands.root_user = False
self.cli.demands.available_repos = True
self.cli.demands.sack_activation = True
def run(self):
installed = listpkg_to_setstr(ListCompletionCommand.installed(self.base,
self.opts.pkg_specs))
available = listpkg_to_setstr(ListCompletionCommand.available(self.base,
self.opts.pkg_specs))
for pkg in (installed & available):
print(str(pkg))
class ListCompletionCommand(dnf.cli.commands.ListCommand):
def __init__(self, args):
super(ListCompletionCommand, self).__init__(args)
def run(self):
subcmds = self.pkgnarrows
args = self.opts.packages
action = self.opts.packages_action
if len(args) > 1 and args[1] not in subcmds:
print("\n".join(filter_list_by_kw(args[1], subcmds)))
else:
if action == "installed":
pkgs = self.installed(self.base, args)
elif action == "available":
pkgs = self.available(self.base, args)
elif action == "updates":
pkgs = self.updates(self.base, args)
else:
available = listpkg_to_setstr(self.available(self.base, args))
installed = listpkg_to_setstr(self.installed(self.base, args))
pkgs = (available | installed)
if not pkgs:
print("\n".join(filter_list_by_kw(args[0], subcmds)))
return
for pkg in pkgs:
print(str(pkg))
@staticmethod
def installed(base, arg):
return base.sack.query().installed().filterm(name__glob="{}*".format(arg[0]))
@staticmethod
def available(base, arg):
return base.sack.query().available().filterm(name__glob="{}*".format(arg[0]))
@staticmethod
def updates(base, arg):
return base.check_updates(["{}*".format(arg[0])], print_=False)
class RepoListCompletionCommand(dnf.cli.commands.repolist.RepoListCommand):
def __init__(self, args):
super(RepoListCompletionCommand, self).__init__(args)
def run(self):
args = self.opts
if args.repos_action == "enabled":
print("\n".join(filter_list_by_kw(args.repos[0],
[r.id for r in self.base.repos.iter_enabled()])))
elif args.repos_action == "disabled":
print("\n".join(filter_list_by_kw(args.repos[0],
[r.id for r in self.base.repos.all() if not r.enabled])))
elif args.repos_action == "all":
print("\n".join(filter_list_by_kw(args.repos[0],
[r.id for r in self.base.repos.all()])))
class UpgradeCompletionCommand(dnf.cli.commands.upgrade.UpgradeCommand):
def __init__(self, args):
super(UpgradeCompletionCommand, self).__init__(args)
def configure(self):
self.cli.demands.root_user = False
self.cli.demands.available_repos = True
self.cli.demands.sack_activation = True
def run(self):
for pkg in ListCompletionCommand.updates(self.base, self.opts.pkg_specs):
print(str(pkg))
class DowngradeCompletionCommand(dnf.cli.commands.downgrade.DowngradeCommand):
def __init__(self, args):
super(DowngradeCompletionCommand, self).__init__(args)
def configure(self):
self.cli.demands.root_user = False
self.cli.demands.available_repos = True
self.cli.demands.sack_activation = True
def run(self):
for pkg in ListCompletionCommand.available(self.base, self.opts.pkg_specs).downgrades():
print(str(pkg))
class CleanCompletionCommand(dnf.cli.commands.clean.CleanCommand):
def __init__(self, args):
super(CleanCompletionCommand, self).__init__(args)
def run(self):
subcmds = dnf.cli.commands.clean._CACHE_TYPES.keys()
print("\n".join(filter_list_by_kw(self.opts.type[1], subcmds)))
def main(args):
base = dnf.cli.cli.BaseCli()
cli = dnf.cli.Cli(base)
if args[0] == "_cmds":
base.init_plugins([], [], cli)
print("\n".join(filter_list_by_kw(args[1], cli.cli_commands)))
return
cli.cli_commands.clear()
cli.register_command(RemoveCompletionCommand)
cli.register_command(InstallCompletionCommand)
cli.register_command(ReinstallCompletionCommand)
cli.register_command(ListCompletionCommand)
cli.register_command(RepoListCompletionCommand)
cli.register_command(UpgradeCompletionCommand)
cli.register_command(DowngradeCompletionCommand)
cli.register_command(CleanCompletionCommand)
cli.configure(args)
try:
cli.run()
except (OSError, dnf.exceptions.Error):
sys.exit(0)
if __name__ == "__main__":
try:
main(sys.argv[1:])
except KeyboardInterrupt:
sys.exit(1)
option_parser.py 0000644 00000056464 15051234446 0010025 0 ustar 00 # optparse.py
# CLI options parser.
#
# Copyright (C) 2014-2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import unicode_literals
from dnf.i18n import _
from dnf.util import _parse_specs
import argparse
import dnf.exceptions
import dnf.util
import dnf.rpm
import dnf.yum.misc
import logging
import os.path
import re
import sys
logger = logging.getLogger("dnf")
class MultilineHelpFormatter(argparse.HelpFormatter):
def _split_lines(self, text, width):
if '\n' in text:
return text.splitlines()
return super(MultilineHelpFormatter, self)._split_lines(text, width)
class OptionParser(argparse.ArgumentParser):
"""ArgumentParser like class to do things the "yum way"."""
def __init__(self, reset_usage=True):
super(OptionParser, self).__init__(add_help=False,
formatter_class=MultilineHelpFormatter)
self.command_positional_parser = None
self.command_group = None
self._add_general_options()
if reset_usage:
self._cmd_usage = {} # names, summary for dnf commands, to build usage
self._cmd_groups = set() # cmd groups added (main, plugin)
def error(self, msg):
"""Output an error message, and exit the program.
This method overrides standard argparser's error
so that error output goes to the logger.
:param msg: the error message to output
"""
self.print_usage()
logger.critical(_("Command line error: %s"), msg)
sys.exit(1)
class _RepoCallback(argparse.Action):
def __call__(self, parser, namespace, values, opt_str):
operation = 'disable' if opt_str == '--disablerepo' else 'enable'
l = getattr(namespace, self.dest)
l.extend((x, operation) for x in re.split(r'\s*[,\s]\s*', values))
class _RepoCallbackEnable(argparse.Action):
def __call__(self, parser, namespace, values, opt_str):
namespace.repos_ed.append((values[0], 'enable'))
setattr(namespace, 'reponame', values)
class _SplitCallback(argparse._AppendAction):
""" Split all strings in seq, at "," and whitespace.
Returns a new list. """
SPLITTER = r'\s*[,\s]\s*'
def __call__(self, parser, namespace, values, opt_str):
first = True
for val in re.split(self.SPLITTER, values):
if first or val:
# Empty values are sometimes used to clear existing content of the option.
# Only the first value in the parsed string can be empty. Other empty values
# are ignored.
super(OptionParser._SplitCallback,
self).__call__(parser, namespace, val, opt_str)
first = False
class _SplitExtendDictCallback(argparse.Action):
""" Split string at "," or whitespace to (key, value).
Extends dict with {key: value}."""
def __call__(self, parser, namespace, values, opt_str):
try:
key, val = values.split(',')
if not key or not val:
raise ValueError
except ValueError:
msg = _('bad format: %s') % values
raise argparse.ArgumentError(self, msg)
dct = getattr(namespace, self.dest)
dct[key] = val
class _SetoptsCallback(argparse.Action):
""" Parse setopts arguments and put them into main_<setopts>
and repo_<setopts>."""
def __call__(self, parser, namespace, values, opt_str):
vals = values.split('=')
if len(vals) > 2:
logger.warning(_("Setopt argument has multiple values: %s"), values)
return
if len(vals) < 2:
logger.warning(_("Setopt argument has no value: %s"), values)
return
k, v = vals
period = k.rfind('.')
if period != -1:
repo = k[:period]
k = k[period+1:]
if hasattr(namespace, 'repo_setopts'):
repoopts = namespace.repo_setopts
else:
repoopts = {}
repoopts.setdefault(repo, {}).setdefault(k, []).append(v)
setattr(namespace, 'repo_' + self.dest, repoopts)
else:
if hasattr(namespace, 'main_setopts'):
mainopts = namespace.main_setopts
else:
mainopts = {}
mainopts.setdefault(k, []).append(v)
setattr(namespace, 'main_' + self.dest, mainopts)
class ParseSpecGroupFileCallback(argparse.Action):
def __call__(self, parser, namespace, values, opt_str):
_parse_specs(namespace, values)
class PkgNarrowCallback(argparse.Action):
def __init__(self, *args, **kwargs):
self.pkgnarrow = {}
try:
for k in ['choices', 'default']:
self.pkgnarrow[k] = kwargs[k]
del kwargs[k]
except KeyError as e:
raise TypeError("%s() missing mandatory argument %s"
% (self.__class__.__name__, e))
kwargs['default'] = []
super(OptionParser.PkgNarrowCallback, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, opt_str):
dest_action = self.dest + '_action'
if not values or values[0] not in self.pkgnarrow['choices']:
narrow = self.pkgnarrow['default']
else:
narrow = values.pop(0)
setattr(namespace, dest_action, narrow)
setattr(namespace, self.dest, values)
class ForceArchAction(argparse.Action):
def __call__(self, parser, namespace, values, opt_str):
namespace.ignorearch = True
namespace.arch = values
def _add_general_options(self):
""" Standard options known to all dnf subcommands. """
# All defaults need to be a None, so we can always tell whether the user
# has set something or whether we are getting a default.
general_grp = self.add_argument_group(_('General {prog} options'.format(
prog=dnf.util.MAIN_PROG_UPPER)))
general_grp.add_argument("-c", "--config", dest="config_file_path",
default=None, metavar='[config file]',
help=_("config file location"))
general_grp.add_argument("-q", "--quiet", dest="quiet",
action="store_true", default=None,
help=_("quiet operation"))
general_grp.add_argument("-v", "--verbose", action="store_true",
default=None, help=_("verbose operation"))
general_grp.add_argument("--version", action="store_true", default=None,
help=_("show {prog} version and exit").format(
prog=dnf.util.MAIN_PROG_UPPER))
general_grp.add_argument("--installroot", help=_("set install root"),
metavar='[path]')
general_grp.add_argument("--nodocs", action="store_const", const=['nodocs'], dest='tsflags',
help=_("do not install documentations"))
general_grp.add_argument("--noplugins", action="store_false",
default=None, dest='plugins',
help=_("disable all plugins"))
general_grp.add_argument("--enableplugin", dest="enableplugin",
default=[], action=self._SplitCallback,
help=_("enable plugins by name"),
metavar='[plugin]')
general_grp.add_argument("--disableplugin", dest="disableplugin",
default=[], action=self._SplitCallback,
help=_("disable plugins by name"),
metavar='[plugin]')
general_grp.add_argument("--releasever", default=None,
help=_("override the value of $releasever"
" in config and repo files"))
general_grp.add_argument("--setopt", dest="setopts", default=[],
action=self._SetoptsCallback,
help=_("set arbitrary config and repo options"))
general_grp.add_argument("--skip-broken", dest="skip_broken", action="store_true",
default=None,
help=_("resolve depsolve problems by skipping packages"))
general_grp.add_argument('-h', '--help', '--help-cmd',
action="store_true", dest='help',
help=_("show command help"))
general_grp.add_argument('--allowerasing', action='store_true',
default=None,
help=_('allow erasing of installed packages to '
'resolve dependencies'))
best_group = general_grp.add_mutually_exclusive_group()
best_group.add_argument("-b", "--best", action="store_true", dest='best', default=None,
help=_("try the best available package versions in transactions."))
best_group.add_argument("--nobest", action="store_false", dest='best',
help=_("do not limit the transaction to the best candidate"))
general_grp.add_argument("-C", "--cacheonly", dest="cacheonly",
action="store_true", default=None,
help=_("run entirely from system cache, "
"don't update cache"))
general_grp.add_argument("-R", "--randomwait", dest="sleeptime", type=int,
default=None, metavar='[minutes]',
help=_("maximum command wait time"))
general_grp.add_argument("-d", "--debuglevel", dest="debuglevel",
metavar='[debug level]', default=None,
help=_("debugging output level"), type=int)
general_grp.add_argument("--debugsolver",
action="store_true", default=None,
help=_("dumps detailed solving results into"
" files"))
general_grp.add_argument("--showduplicates", dest="showdupesfromrepos",
action="store_true", default=None,
help=_("show duplicates, in repos, "
"in list/search commands"))
general_grp.add_argument("-e", "--errorlevel", default=None, type=int,
help=_("error output level"))
general_grp.add_argument("--obsoletes", default=None, dest="obsoletes",
action="store_true",
help=_("enables {prog}'s obsoletes processing logic "
"for upgrade or display capabilities that "
"the package obsoletes for info, list and "
"repoquery").format(prog=dnf.util.MAIN_PROG))
general_grp.add_argument("--rpmverbosity", default=None,
help=_("debugging output level for rpm"),
metavar='[debug level name]')
general_grp.add_argument("-y", "--assumeyes", action="store_true",
default=None, help=_("automatically answer yes"
" for all questions"))
general_grp.add_argument("--assumeno", action="store_true",
default=None, help=_("automatically answer no"
" for all questions"))
general_grp.add_argument("--enablerepo", action=self._RepoCallback,
dest='repos_ed', default=[], metavar='[repo]',
help=_("Enable additional repositories. List option. "
"Supports globs, can be specified multiple times."))
repo_group = general_grp.add_mutually_exclusive_group()
repo_group.add_argument("--disablerepo", action=self._RepoCallback,
dest='repos_ed', default=[], metavar='[repo]',
help=_("Disable repositories. List option. "
"Supports globs, can be specified multiple times."))
repo_group.add_argument('--repo', '--repoid', metavar='[repo]', dest='repo',
action=self._SplitCallback, default=[],
help=_('enable just specific repositories by an id or a glob, '
'can be specified multiple times'))
enable_group = general_grp.add_mutually_exclusive_group()
enable_group.add_argument("--enable", default=False,
dest="set_enabled", action="store_true",
help=_("enable repos with config-manager "
"command (automatically saves)"))
enable_group.add_argument("--disable", default=False,
dest="set_disabled", action="store_true",
help=_("disable repos with config-manager "
"command (automatically saves)"))
general_grp.add_argument("-x", "--exclude", "--excludepkgs", default=[],
dest='excludepkgs', action=self._SplitCallback,
help=_("exclude packages by name or glob"),
metavar='[package]')
general_grp.add_argument("--disableexcludes", "--disableexcludepkgs",
default=[], dest="disable_excludes",
action=self._SplitCallback,
help=_("disable excludepkgs"),
metavar='[repo]')
general_grp.add_argument("--repofrompath", default={},
action=self._SplitExtendDictCallback,
metavar='[repo,path]',
help=_("label and path to an additional repository to use (same "
"path as in a baseurl), can be specified multiple times."))
general_grp.add_argument("--noautoremove", action="store_false",
default=None, dest='clean_requirements_on_remove',
help=_("disable removal of dependencies that are no longer used"))
general_grp.add_argument("--nogpgcheck", action="store_false",
default=None, dest='gpgcheck',
help=_("disable gpg signature checking (if RPM policy allows)"))
general_grp.add_argument("--color", dest="color", default=None,
help=_("control whether color is used"))
general_grp.add_argument("--refresh", dest="freshest_metadata",
action="store_true",
help=_("set metadata as expired before running"
" the command"))
general_grp.add_argument("-4", dest="ip_resolve", default=None,
help=_("resolve to IPv4 addresses only"),
action="store_const", const='ipv4')
general_grp.add_argument("-6", dest="ip_resolve", default=None,
help=_("resolve to IPv6 addresses only"),
action="store_const", const='ipv6')
general_grp.add_argument("--destdir", "--downloaddir", dest="destdir", default=None,
help=_("set directory to copy packages to"))
general_grp.add_argument("--downloadonly", dest="downloadonly",
action="store_true", default=False,
help=_("only download packages"))
general_grp.add_argument("--comment", dest="comment", default=None,
help=_("add a comment to transaction"))
# Updateinfo options...
general_grp.add_argument("--bugfix", action="store_true",
help=_("Include bugfix relevant packages, "
"in updates"))
general_grp.add_argument("--enhancement", action="store_true",
help=_("Include enhancement relevant packages,"
" in updates"))
general_grp.add_argument("--newpackage", action="store_true",
help=_("Include newpackage relevant packages,"
" in updates"))
general_grp.add_argument("--security", action="store_true",
help=_("Include security relevant packages, "
"in updates"))
general_grp.add_argument("--advisory", "--advisories", dest="advisory",
default=[], action=self._SplitCallback,
help=_("Include packages needed to fix the "
"given advisory, in updates"))
general_grp.add_argument("--bz", "--bzs", default=[], dest="bugzilla",
action=self._SplitCallback, help=_(
"Include packages needed to fix the given BZ, in updates"))
general_grp.add_argument("--cve", "--cves", default=[], dest="cves",
action=self._SplitCallback,
help=_("Include packages needed to fix the given CVE, in updates"))
general_grp.add_argument(
"--sec-severity", "--secseverity",
choices=['Critical', 'Important', 'Moderate', 'Low'], default=[],
dest="severity", action=self._SplitCallback, help=_(
"Include security relevant packages matching the severity, "
"in updates"))
general_grp.add_argument("--forcearch", metavar="ARCH",
dest=argparse.SUPPRESS,
action=self.ForceArchAction,
choices=sorted(dnf.rpm._BASEARCH_MAP.keys()),
help=_("Force the use of an architecture"))
general_grp.add_argument('command', nargs='?', help=argparse.SUPPRESS)
def _add_cmd_usage(self, cmd, group):
""" store usage info about a single dnf command."""
summary = dnf.i18n.ucd(cmd.summary)
name = dnf.i18n.ucd(cmd.aliases[0])
if not name in self._cmd_usage:
self._cmd_usage[name] = (group, summary)
self._cmd_groups.add(group)
def add_commands(self, cli_cmds, group):
""" store name & summary for dnf commands
The stored information is used build usage information
grouped by build-in & plugin commands.
"""
for cmd in set(cli_cmds.values()):
self._add_cmd_usage(cmd, group)
def get_usage(self):
""" get the usage information to show the user. """
desc = {'main': _('List of Main Commands:'),
'plugin': _('List of Plugin Commands:')}
usage = '%s [options] COMMAND\n' % dnf.util.MAIN_PROG
for grp in ['main', 'plugin']:
if not grp in self._cmd_groups:
# dont add plugin usage, if we dont have plugins
continue
usage += "\n%s\n\n" % desc[grp]
for name in sorted(self._cmd_usage.keys()):
group, summary = self._cmd_usage[name]
if group == grp:
usage += "%-25s %s\n" % (name, summary)
return usage
def _add_command_options(self, command):
self.prog = "%s %s" % (dnf.util.MAIN_PROG, command._basecmd)
self.description = command.summary
self.command_positional_parser = argparse.ArgumentParser(self.prog, add_help=False)
self.command_positional_parser.print_usage = self.print_usage
self.command_positional_parser._positionals.title = None
self.command_group = self.add_argument_group(
'{} command-specific options'.format(command._basecmd.capitalize()))
self.command_group.add_argument = self.cmd_add_argument
self.command_group._command = command._basecmd
command.set_argparser(self.command_group)
def cmd_add_argument(self, *args, **kwargs):
if all([(arg[0] in self.prefix_chars) for arg in args]):
return type(self.command_group).add_argument(self.command_group, *args, **kwargs)
else:
return self.command_positional_parser.add_argument(*args, **kwargs)
def _check_encoding(self, args):
for arg in args:
try:
arg.encode('utf-8')
except UnicodeEncodeError as e:
raise dnf.exceptions.ConfigError(
_("Cannot encode argument '%s': %s") % (arg, str(e)))
def parse_main_args(self, args):
self._check_encoding(args)
namespace, _unused_args = self.parse_known_args(args)
return namespace
def parse_command_args(self, command, args):
self._add_command_options(command)
namespace, unused_args = self.parse_known_args(args)
namespace = self.command_positional_parser.parse_args(unused_args, namespace)
command.opts = namespace
return command.opts
def print_usage(self, file_=None):
if self.command_positional_parser:
self._actions += self.command_positional_parser._actions
super(OptionParser, self).print_usage(file_)
def print_help(self, command=None):
# pylint: disable=W0212
if command:
if not self.command_group or self.command_group._command != command._basecmd:
self._add_command_options(command)
self._actions += self.command_positional_parser._actions
self._action_groups.append(self.command_positional_parser._positionals)
else:
self.usage = self.get_usage()
super(OptionParser, self).print_help()
commands/swap.py 0000644 00000004563 15051234446 0007705 0 ustar 00 #
# Copyright (C) 2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import absolute_import
from __future__ import unicode_literals
from dnf.i18n import _
from dnf.cli import commands
import dnf.util
import logging
logger = logging.getLogger("dnf")
class SwapCommand(commands.Command):
"""A class containing methods needed by the cli to execute the swap command.
"""
aliases = ('swap',)
summary = _('run an interactive {prog} mod for remove and install one spec').format(
prog=dnf.util.MAIN_PROG_UPPER)
@staticmethod
def set_argparser(parser):
parser.add_argument('remove_spec', action="store", help=_('The specs that will be removed'))
parser.add_argument('install_spec', action="store", help=_(
'The specs that will be installed'))
def configure(self):
demands = self.cli.demands
demands.sack_activation = True
demands.available_repos = True
demands.resolving = True
demands.root_user = True
commands._checkGPGKey(self.base, self.cli)
commands._checkEnabledRepo(self.base, [self.opts.install_spec])
def _perform(self, cmd_str, spec):
cmd_cls = self.cli.cli_commands.get(cmd_str)
if cmd_cls is not None:
cmd = cmd_cls(self.cli)
self.cli.optparser.parse_command_args(cmd, [cmd_str, spec])
cmd.run()
def run(self):
self._perform('remove', self.opts.remove_spec)
self._perform('install', self.opts.install_spec)
commands/mark.py 0000644 00000006720 15051234446 0007662 0 ustar 00 # mark.py
# Mark CLI command.
#
# Copyright (C) 2015-2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import print_function
from __future__ import unicode_literals
import libdnf.transaction
from dnf.i18n import _
from dnf.cli import commands
import dnf
import functools
import logging
logger = logging.getLogger("dnf")
class MarkCommand(commands.Command):
aliases = ('mark',)
summary = _('mark or unmark installed packages as installed by user.')
@staticmethod
def set_argparser(parser):
parser.add_argument('mark', nargs=1, choices=['install', 'remove', 'group'],
help=_("install: mark as installed by user\n"
"remove: unmark as installed by user\n"
"group: mark as installed by group"))
parser.add_argument('package', nargs='+', metavar="PACKAGE",
help=_("Package specification"))
def _mark_install(self, pkg):
self.base.history.set_reason(pkg, libdnf.transaction.TransactionItemReason_USER)
logger.info(_('%s marked as user installed.'), str(pkg))
def _mark_remove(self, pkg):
self.base.history.set_reason(pkg, libdnf.transaction.TransactionItemReason_DEPENDENCY)
logger.info(_('%s unmarked as user installed.'), str(pkg))
def _mark_group(self, pkg):
self.base.history.set_reason(pkg, libdnf.transaction.TransactionItemReason_GROUP)
logger.info(_('%s marked as group installed.'), str(pkg))
def configure(self):
demands = self.cli.demands
demands.sack_activation = True
demands.root_user = True
demands.available_repos = False
demands.resolving = False
def run(self):
cmd = self.opts.mark[0]
pkgs = self.opts.package
mark_func = functools.partial(getattr(self, '_mark_' + cmd))
notfound = []
for pkg in pkgs:
subj = dnf.subject.Subject(pkg)
q = subj.get_best_query(self.base.sack)
for pkg in q:
mark_func(pkg)
if len(q) == 0:
notfound.append(pkg)
if notfound:
logger.error(_('Error:'))
for pkg in notfound:
logger.error(_('Package %s is not installed.'), pkg)
raise dnf.cli.CliError
old = self.base.history.last()
if old is None:
rpmdb_version = self.sack._rpmdb_version()
else:
rpmdb_version = old.end_rpmdb_version
self.base.history.beg(rpmdb_version, [], [])
self.base.history.end(rpmdb_version)
commands/repoquery.py 0000644 00000103331 15051234446 0010757 0 ustar 00 #
# Copyright (C) 2014 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
from dnf.i18n import _
from dnf.cli import commands
from dnf.cli.option_parser import OptionParser
import argparse
import datetime
import logging
import re
import sys
import dnf
import dnf.cli
import dnf.exceptions
import dnf.subject
import dnf.util
import hawkey
logger = logging.getLogger('dnf')
QFORMAT_DEFAULT = '%{name}-%{epoch}:%{version}-%{release}.%{arch}'
# matches %[-][dd]{attr}
QFORMAT_MATCH = re.compile(r'%(-?\d*?){([:.\w]+?)}')
QUERY_TAGS = """\
name, arch, epoch, version, release, reponame (repoid), from_repo, evr,
debug_name, source_name, source_debug_name,
installtime, buildtime, size, downloadsize, installsize,
provides, requires, obsoletes, conflicts, sourcerpm,
description, summary, license, url, reason"""
OPTS_MAPPING = {
'conflicts': 'conflicts',
'enhances': 'enhances',
'obsoletes': 'obsoletes',
'provides': 'provides',
'recommends': 'recommends',
'requires': 'requires',
'requires-pre': 'requires_pre',
'suggests': 'suggests',
'supplements': 'supplements'
}
def rpm2py_format(queryformat):
"""Convert a rpm like QUERYFMT to an python .format() string."""
def fmt_repl(matchobj):
fill = matchobj.groups()[0]
key = matchobj.groups()[1]
if fill:
if fill[0] == '-':
fill = '>' + fill[1:]
else:
fill = '<' + fill
fill = ':' + fill
return '{0.' + key.lower() + fill + "}"
def brackets(txt):
return txt.replace('{', '{{').replace('}', '}}')
queryformat = queryformat.replace("\\n", "\n").replace("\\t", "\t")
for key, value in OPTS_MAPPING.items():
queryformat = queryformat.replace(key, value)
fmt = ""
spos = 0
for item in QFORMAT_MATCH.finditer(queryformat):
fmt += brackets(queryformat[spos:item.start()])
fmt += fmt_repl(item)
spos = item.end()
fmt += brackets(queryformat[spos:])
return fmt
class _CommaSplitCallback(OptionParser._SplitCallback):
SPLITTER = r'\s*,\s*'
class RepoQueryCommand(commands.Command):
"""A class containing methods needed by the cli to execute the repoquery command.
"""
nevra_forms = {'repoquery-n': hawkey.FORM_NAME,
'repoquery-na': hawkey.FORM_NA,
'repoquery-nevra': hawkey.FORM_NEVRA}
aliases = ('repoquery', 'rq') + tuple(nevra_forms.keys())
summary = _('search for packages matching keyword')
@staticmethod
def filter_repo_arch(opts, query):
"""Filter query by repoid and arch options"""
if opts.repo:
query.filterm(reponame=opts.repo)
if opts.arches:
query.filterm(arch=opts.arches)
return query
@staticmethod
def set_argparser(parser):
parser.add_argument('-a', '--all', dest='queryall', action='store_true',
help=_("Query all packages (shorthand for repoquery '*' "
"or repoquery without argument)"))
parser.add_argument('--show-duplicates', action='store_true',
help=_("Query all versions of packages (default)"))
parser.add_argument('--arch', '--archlist', dest='arches', default=[],
action=_CommaSplitCallback, metavar='[arch]',
help=_('show only results from this ARCH'))
parser.add_argument('-f', '--file', metavar='FILE', nargs='+',
help=_('show only results that owns FILE'))
parser.add_argument('--whatconflicts', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that conflict REQ'))
parser.add_argument('--whatdepends', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('shows results that requires, suggests, supplements, enhances,'
'or recommends package provides and files REQ'))
parser.add_argument('--whatobsoletes', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that obsolete REQ'))
parser.add_argument('--whatprovides', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that provide REQ'))
parser.add_argument('--whatrequires', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('shows results that requires package provides and files REQ'))
parser.add_argument('--whatrecommends', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that recommend REQ'))
parser.add_argument('--whatenhances', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that enhance REQ'))
parser.add_argument('--whatsuggests', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that suggest REQ'))
parser.add_argument('--whatsupplements', default=[], action=_CommaSplitCallback,
metavar='REQ',
help=_('show only results that supplement REQ'))
whatrequiresform = parser.add_mutually_exclusive_group()
whatrequiresform.add_argument("--alldeps", action="store_true",
help=_("check non-explicit dependencies (files and Provides); default"))
whatrequiresform.add_argument("--exactdeps", action="store_true",
help=_('check dependencies exactly as given, opposite of --alldeps'))
parser.add_argument("--recursive", action="store_true", help=_(
'used with --whatrequires, and --requires --resolve, query packages recursively.'))
parser.add_argument('--deplist', action='store_true', help=_(
"show a list of all dependencies and what packages provide them"))
parser.add_argument('--resolve', action='store_true',
help=_('resolve capabilities to originating package(s)'))
parser.add_argument("--tree", action="store_true",
help=_('show recursive tree for package(s)'))
parser.add_argument('--srpm', action='store_true',
help=_('operate on corresponding source RPM'))
parser.add_argument("--latest-limit", dest='latest_limit', type=int,
help=_('show N latest packages for a given name.arch'
' (or latest but N if N is negative)'))
parser.add_argument("--disable-modular-filtering", action="store_true",
help=_("list also packages of inactive module streams"))
outform = parser.add_mutually_exclusive_group()
outform.add_argument('-i', "--info", dest='queryinfo',
default=False, action='store_true',
help=_('show detailed information about the package'))
outform.add_argument('-l', "--list", dest='queryfilelist',
default=False, action='store_true',
help=_('show list of files in the package'))
outform.add_argument('-s', "--source", dest='querysourcerpm',
default=False, action='store_true',
help=_('show package source RPM name'))
outform.add_argument('--changelogs', dest='querychangelogs',
default=False, action='store_true',
help=_('show changelogs of the package'))
outform.add_argument('--qf', "--queryformat", dest='queryformat',
default=QFORMAT_DEFAULT,
help=_('display format for listing packages: '
'"%%{name} %%{version} ...", '
'use --querytags to view full tag list'))
parser.add_argument('--querytags', action='store_true',
help=_('show available tags to use with '
'--queryformat'))
outform.add_argument("--nevra", dest='queryformat', const=QFORMAT_DEFAULT,
action='store_const',
help=_('use name-epoch:version-release.architecture format for '
'displaying found packages (default)'))
outform.add_argument("--nvr", dest='queryformat', const='%{name}-%{version}-%{release}',
action='store_const', help=_('use name-version-release format for '
'displaying found packages '
'(rpm query default)'))
outform.add_argument("--envra", dest='queryformat',
const='%{epoch}:%{name}-%{version}-%{release}.%{arch}',
action='store_const',
help=_('use epoch:name-version-release.architecture format for '
'displaying found packages'))
outform.add_argument('--groupmember', action="store_true", help=_(
'Display in which comps groups are presented selected packages'))
pkgfilter = parser.add_mutually_exclusive_group()
pkgfilter.add_argument("--duplicates", dest='pkgfilter',
const='duplicated', action='store_const',
help=_('limit the query to installed duplicate '
'packages'))
pkgfilter.add_argument("--duplicated", dest='pkgfilter',
const='duplicated', action='store_const',
help=argparse.SUPPRESS)
pkgfilter.add_argument("--installonly", dest='pkgfilter',
const='installonly', action='store_const',
help=_('limit the query to installed installonly packages'))
pkgfilter.add_argument("--unsatisfied", dest='pkgfilter',
const='unsatisfied', action='store_const',
help=_('limit the query to installed packages with unsatisfied dependencies'))
parser.add_argument('--location', action='store_true',
help=_('show a location from where packages can be downloaded'))
package_attribute = parser.add_mutually_exclusive_group()
help_msgs = {
'conflicts': _('Display capabilities that the package conflicts with.'),
'depends': _('Display capabilities that the package can depend on, enhance, recommend,'
' suggest, and supplement.'),
'enhances': _('Display capabilities that the package can enhance.'),
'provides': _('Display capabilities provided by the package.'),
'recommends': _('Display capabilities that the package recommends.'),
'requires': _('Display capabilities that the package depends on.'),
'requires-pre': _('If the package is not installed display capabilities that it depends on for '
'running %%pre and %%post scriptlets. If the package is installed display '
'capabilities that is depends for %%pre, %%post, %%preun and %%postun.'),
'suggests': _('Display capabilities that the package suggests.'),
'supplements': _('Display capabilities that the package can supplement.')
}
for arg, help_msg in help_msgs.items():
name = '--%s' % arg
package_attribute.add_argument(name, dest='packageatr', action='store_const',
const=arg, help=help_msg)
parser.add_argument('--available', action="store_true", help=_('Display only available packages.'))
help_list = {
'installed': _('Display only installed packages.'),
'extras': _('Display only packages that are not present in any of available repositories.'),
'upgrades': _('Display only packages that provide an upgrade for some already installed package.'),
'unneeded': _('Display only packages that can be removed by "{prog} autoremove" '
'command.').format(prog=dnf.util.MAIN_PROG),
'userinstalled': _('Display only packages that were installed by user.')
}
list_group = parser.add_mutually_exclusive_group()
for list_arg, help_arg in help_list.items():
switch = '--%s' % list_arg
list_group.add_argument(switch, dest='list', action='store_const',
const=list_arg, help=help_arg)
# make --autoremove hidden compatibility alias for --unneeded
list_group.add_argument(
'--autoremove', dest='list', action='store_const',
const="unneeded", help=argparse.SUPPRESS)
parser.add_argument('--recent', action="store_true", help=_('Display only recently edited packages'))
parser.add_argument('key', nargs='*', metavar="KEY",
help=_('the key to search for'))
def pre_configure(self):
if not self.opts.quiet:
self.cli.redirect_logger(stdout=logging.WARNING, stderr=logging.INFO)
def configure(self):
if not self.opts.quiet:
self.cli.redirect_repo_progress()
demands = self.cli.demands
if self.opts.obsoletes:
if self.opts.packageatr:
self.cli._option_conflict("--obsoletes", "--" + self.opts.packageatr)
else:
self.opts.packageatr = "obsoletes"
if self.opts.querytags:
return
if self.opts.resolve and not self.opts.packageatr:
raise dnf.cli.CliError(
_("Option '--resolve' has to be used together with one of the "
"'--conflicts', '--depends', '--enhances', '--provides', '--recommends', "
"'--requires', '--requires-pre', '--suggests' or '--supplements' options"))
if self.opts.recursive:
if self.opts.exactdeps:
self.cli._option_conflict("--recursive", "--exactdeps")
if not any([self.opts.whatrequires,
(self.opts.packageatr == "requires" and self.opts.resolve)]):
raise dnf.cli.CliError(
_("Option '--recursive' has to be used with '--whatrequires <REQ>' "
"(optionally with '--alldeps', but not with '--exactdeps'), or with "
"'--requires <REQ> --resolve'"))
if self.opts.alldeps or self.opts.exactdeps:
if not (self.opts.whatrequires or self.opts.whatdepends):
raise dnf.cli.CliError(
_("argument {} requires --whatrequires or --whatdepends option".format(
'--alldeps' if self.opts.alldeps else '--exactdeps')))
if self.opts.srpm:
self.base.repos.enable_source_repos()
if (self.opts.list not in ["installed", "userinstalled"] and
self.opts.pkgfilter != "installonly") or self.opts.available:
demands.available_repos = True
demands.sack_activation = True
if self.opts.querychangelogs:
demands.changelogs = True
def build_format_fn(self, opts, pkg):
if opts.querychangelogs:
out = []
out.append('Changelog for %s' % str(pkg))
for chlog in pkg.changelogs:
dt = chlog['timestamp']
out.append('* %s %s\n%s\n' % (dt.strftime("%a %b %d %Y"),
dnf.i18n.ucd(chlog['author']),
dnf.i18n.ucd(chlog['text'])))
return '\n'.join(out)
try:
po = PackageWrapper(pkg)
if opts.queryinfo:
return self.base.output.infoOutput(pkg)
elif opts.queryfilelist:
filelist = po.files
if not filelist:
print(_('Package {} contains no files').format(pkg), file=sys.stderr)
return filelist
elif opts.querysourcerpm:
return po.sourcerpm
else:
return rpm2py_format(opts.queryformat).format(po)
except AttributeError as e:
# catch that the user has specified attributes
# there don't exist on the dnf Package object.
raise dnf.exceptions.Error(str(e))
def _resolve_nevras(self, nevras, base_query):
resolved_nevras_query = self.base.sack.query().filterm(empty=True)
for nevra in nevras:
resolved_nevras_query = resolved_nevras_query.union(base_query.intersection(
dnf.subject.Subject(nevra).get_best_query(
self.base.sack,
with_provides=False,
with_filenames=False
)
))
return resolved_nevras_query
def _do_recursive_deps(self, query_in, query_select, done=None):
done = done if done else query_select
query_required = query_in.filter(requires=query_select)
query_select = query_required.difference(done)
done = query_required.union(done)
if query_select:
done = self._do_recursive_deps(query_in, query_select, done=done)
return done
def by_all_deps(self, names, query, all_dep_types=False):
# in case of arguments being NEVRAs, resolve them to packages
resolved_nevras_query = self._resolve_nevras(names, query)
# filter the arguments directly as reldeps
depquery = query.filter(requires__glob=names)
# filter the resolved NEVRAs as packages
depquery = depquery.union(query.filter(requires=resolved_nevras_query))
if all_dep_types:
# TODO this is very inefficient, as it resolves the `names` glob to
# reldeps four more times, which in a reasonably wide glob like
# `dnf repoquery --whatdepends "libdnf*"` can take roughly 50% of
# the total execution time.
depquery = depquery.union(query.filter(recommends__glob=names))
depquery = depquery.union(query.filter(enhances__glob=names))
depquery = depquery.union(query.filter(supplements__glob=names))
depquery = depquery.union(query.filter(suggests__glob=names))
depquery = depquery.union(query.filter(recommends=resolved_nevras_query))
depquery = depquery.union(query.filter(enhances=resolved_nevras_query))
depquery = depquery.union(query.filter(supplements=resolved_nevras_query))
depquery = depquery.union(query.filter(suggests=resolved_nevras_query))
if self.opts.recursive:
depquery = self._do_recursive_deps(query, depquery)
return depquery
def _get_recursive_providers_query(self, query_in, providers, done=None):
done = done if done else self.base.sack.query().filterm(empty=True)
t = self.base.sack.query().filterm(empty=True)
for pkg in providers.run():
t = t.union(query_in.filter(provides=pkg.requires))
query_select = t.difference(done)
if query_select:
done = self._get_recursive_providers_query(query_in, query_select, done=t.union(done))
return t.union(done)
def _add_add_remote_packages(self):
rpmnames = []
remote_packages = []
for key in self.opts.key:
schemes = dnf.pycomp.urlparse.urlparse(key)[0]
if key.endswith('.rpm'):
rpmnames.append(key)
elif schemes and schemes in ('http', 'ftp', 'file', 'https'):
rpmnames.append(key)
if rpmnames:
remote_packages = self.base.add_remote_rpms(
rpmnames, strict=False, progress=self.base.output.progress)
return remote_packages
def run(self):
if self.opts.querytags:
print(QUERY_TAGS)
return
self.cli._populate_update_security_filter(self.opts)
q = self.base.sack.query(
flags=hawkey.IGNORE_MODULAR_EXCLUDES
if self.opts.disable_modular_filtering
else hawkey.APPLY_EXCLUDES
)
if self.opts.key:
remote_packages = self._add_add_remote_packages()
kwark = {}
if self.opts.command in self.nevra_forms:
kwark["forms"] = [self.nevra_forms[self.opts.command]]
pkgs = []
query_results = q.filter(empty=True)
if remote_packages:
query_results = query_results.union(
self.base.sack.query().filterm(pkg=remote_packages))
for key in self.opts.key:
query_results = query_results.union(
dnf.subject.Subject(key, ignore_case=True).get_best_query(
self.base.sack, with_provides=False, query=q, **kwark))
q = query_results
if self.opts.recent:
q = q._recent(self.base.conf.recent)
if self.opts.available:
if self.opts.list and self.opts.list != "installed":
print(self.cli.optparser.print_usage())
raise dnf.exceptions.Error(_("argument {}: not allowed with argument {}".format(
"--available", "--" + self.opts.list)))
elif self.opts.list == "unneeded":
q = q._unneeded(self.base.history.swdb)
elif self.opts.list and self.opts.list != 'userinstalled':
q = getattr(q, self.opts.list)()
if self.opts.pkgfilter == "duplicated":
installonly = self.base._get_installonly_query(q)
q = q.difference(installonly).duplicated()
elif self.opts.pkgfilter == "installonly":
q = self.base._get_installonly_query(q)
elif self.opts.pkgfilter == "unsatisfied":
rpmdb = dnf.sack.rpmdb_sack(self.base)
rpmdb._configure(self.base.conf.installonlypkgs, self.base.conf.installonly_limit)
goal = dnf.goal.Goal(rpmdb)
goal.protect_running_kernel = False
solved = goal.run(verify=True)
if not solved:
print(dnf.util._format_resolve_problems(goal.problem_rules()))
return
elif not self.opts.list:
# do not show packages from @System repo
q = q.available()
# filter repo and arch
q = self.filter_repo_arch(self.opts, q)
orquery = q
if self.opts.file:
q.filterm(file__glob=self.opts.file)
if self.opts.whatconflicts:
rels = q.filter(conflicts__glob=self.opts.whatconflicts)
q = rels.union(q.filter(conflicts=self._resolve_nevras(self.opts.whatconflicts, q)))
if self.opts.whatobsoletes:
q.filterm(obsoletes=self.opts.whatobsoletes)
if self.opts.whatprovides:
query_for_provide = q.filter(provides__glob=self.opts.whatprovides)
if query_for_provide:
q = query_for_provide
else:
q.filterm(file__glob=self.opts.whatprovides)
if self.opts.whatrequires:
if (self.opts.exactdeps):
q.filterm(requires__glob=self.opts.whatrequires)
else:
q = self.by_all_deps(self.opts.whatrequires, q)
if self.opts.whatdepends:
if (self.opts.exactdeps):
dependsquery = q.filter(requires__glob=self.opts.whatdepends)
dependsquery = dependsquery.union(q.filter(recommends__glob=self.opts.whatdepends))
dependsquery = dependsquery.union(q.filter(enhances__glob=self.opts.whatdepends))
dependsquery = dependsquery.union(q.filter(supplements__glob=self.opts.whatdepends))
q = dependsquery.union(q.filter(suggests__glob=self.opts.whatdepends))
else:
q = self.by_all_deps(self.opts.whatdepends, q, True)
if self.opts.whatrecommends:
rels = q.filter(recommends__glob=self.opts.whatrecommends)
q = rels.union(q.filter(recommends=self._resolve_nevras(self.opts.whatrecommends, q)))
if self.opts.whatenhances:
rels = q.filter(enhances__glob=self.opts.whatenhances)
q = rels.union(q.filter(enhances=self._resolve_nevras(self.opts.whatenhances, q)))
if self.opts.whatsupplements:
rels = q.filter(supplements__glob=self.opts.whatsupplements)
q = rels.union(q.filter(supplements=self._resolve_nevras(self.opts.whatsupplements, q)))
if self.opts.whatsuggests:
rels = q.filter(suggests__glob=self.opts.whatsuggests)
q = rels.union(q.filter(suggests=self._resolve_nevras(self.opts.whatsuggests, q)))
if self.opts.latest_limit:
q = q.latest(self.opts.latest_limit)
# reduce a query to security upgrades if they are specified
q = self.base._merge_update_filters(q, warning=False)
if self.opts.srpm:
pkg_list = []
for pkg in q:
srcname = pkg.source_name
if srcname is not None:
tmp_query = self.base.sack.query().filterm(name=srcname, evr=pkg.evr,
arch='src')
pkg_list += tmp_query.run()
q = self.base.sack.query().filterm(pkg=pkg_list)
if self.opts.tree:
if not self.opts.whatrequires and self.opts.packageatr not in (
'conflicts', 'enhances', 'obsoletes', 'provides', 'recommends',
'requires', 'suggests', 'supplements'):
raise dnf.exceptions.Error(
_("No valid switch specified\nusage: {prog} repoquery [--conflicts|"
"--enhances|--obsoletes|--provides|--recommends|--requires|"
"--suggest|--supplements|--whatrequires] [key] [--tree]\n\n"
"description:\n For the given packages print a tree of the"
"packages.").format(prog=dnf.util.MAIN_PROG))
self.tree_seed(q, orquery, self.opts)
return
pkgs = set()
if self.opts.packageatr:
rels = set()
for pkg in q.run():
if self.opts.list != 'userinstalled' or self.base.history.user_installed(pkg):
if self.opts.packageatr == 'depends':
rels.update(pkg.requires + pkg.enhances + pkg.suggests +
pkg.supplements + pkg.recommends)
else:
rels.update(getattr(pkg, OPTS_MAPPING[self.opts.packageatr]))
if self.opts.resolve:
# find the providing packages and show them
if self.opts.list == "installed":
query = self.filter_repo_arch(self.opts, self.base.sack.query())
else:
query = self.filter_repo_arch(self.opts, self.base.sack.query().available())
providers = query.filter(provides=rels)
if self.opts.recursive:
providers = providers.union(
self._get_recursive_providers_query(query, providers))
pkgs = set()
for pkg in providers.latest().run():
pkgs.add(self.build_format_fn(self.opts, pkg))
else:
pkgs.update(str(rel) for rel in rels)
elif self.opts.location:
for pkg in q.run():
location = pkg.remote_location()
if location is not None:
pkgs.add(location)
elif self.opts.deplist:
pkgs = []
for pkg in sorted(set(q.run())):
if self.opts.list != 'userinstalled' or self.base.history.user_installed(pkg):
deplist_output = []
deplist_output.append('package: ' + str(pkg))
for req in sorted([str(req) for req in pkg.requires]):
deplist_output.append(' dependency: ' + req)
subject = dnf.subject.Subject(req)
query = subject.get_best_query(self.base.sack)
query = self.filter_repo_arch(
self.opts, query.available())
if not self.opts.verbose:
query = query.latest()
for provider in query.run():
deplist_output.append(' provider: ' + str(provider))
pkgs.append('\n'.join(deplist_output))
if pkgs:
print('\n\n'.join(pkgs))
return
elif self.opts.groupmember:
self._group_member_report(q)
return
else:
for pkg in q.run():
if self.opts.list != 'userinstalled' or self.base.history.user_installed(pkg):
pkgs.add(self.build_format_fn(self.opts, pkg))
if pkgs:
if self.opts.queryinfo:
print("\n\n".join(sorted(pkgs)))
else:
print("\n".join(sorted(pkgs)))
def _group_member_report(self, query):
package_conf_dict = {}
for group in self.base.comps.groups:
package_conf_dict[group.id] = set([pkg.name for pkg in group.packages_iter()])
group_package_dict = {}
pkg_not_in_group = []
for pkg in query.run():
group_id_list = []
for group_id, package_name_set in package_conf_dict.items():
if pkg.name in package_name_set:
group_id_list.append(group_id)
if group_id_list:
group_package_dict.setdefault(
'$'.join(sorted(group_id_list)), []).append(str(pkg))
else:
pkg_not_in_group.append(str(pkg))
output = []
for key, package_list in sorted(group_package_dict.items()):
output.append(
'\n'.join(sorted(package_list) + sorted([' @' + id for id in key.split('$')])))
output.append('\n'.join(sorted(pkg_not_in_group)))
if output:
print('\n'.join(output))
def grow_tree(self, level, pkg, opts):
pkg_string = self.build_format_fn(opts, pkg)
if level == -1:
print(pkg_string)
return
spacing = " "
for x in range(0, level):
spacing += "| "
requires = []
for requirepkg in pkg.requires:
requires.append(str(requirepkg))
reqstr = "[" + str(len(requires)) + ": " + ", ".join(requires) + "]"
print(spacing + r"\_ " + pkg_string + " " + reqstr)
def tree_seed(self, query, aquery, opts, level=-1, usedpkgs=None):
for pkg in sorted(set(query.run()), key=lambda p: p.name):
usedpkgs = set() if usedpkgs is None or level == -1 else usedpkgs
if pkg.name.startswith("rpmlib") or pkg.name.startswith("solvable"):
return
self.grow_tree(level, pkg, opts)
if pkg not in usedpkgs:
usedpkgs.add(pkg)
if opts.packageatr:
strpkg = getattr(pkg, opts.packageatr)
ar = {}
for name in set(strpkg):
pkgquery = self.base.sack.query().filterm(provides=name)
for querypkg in pkgquery:
ar[querypkg.name + "." + querypkg.arch] = querypkg
pkgquery = self.base.sack.query().filterm(pkg=list(ar.values()))
else:
pkgquery = self.by_all_deps((pkg.name, ), aquery) if opts.alldeps \
else aquery.filter(requires__glob=pkg.name)
self.tree_seed(pkgquery, aquery, opts, level + 1, usedpkgs)
class PackageWrapper(object):
"""Wrapper for dnf.package.Package, so we can control formatting."""
def __init__(self, pkg):
self._pkg = pkg
def __getattr__(self, attr):
atr = getattr(self._pkg, attr)
if atr is None:
return "(none)"
if isinstance(atr, list):
return '\n'.join(sorted({dnf.i18n.ucd(reldep) for reldep in atr}))
return dnf.i18n.ucd(atr)
@staticmethod
def _get_timestamp(timestamp):
if timestamp > 0:
dt = datetime.datetime.utcfromtimestamp(timestamp)
return dt.strftime("%Y-%m-%d %H:%M")
else:
return ''
@property
def buildtime(self):
return self._get_timestamp(self._pkg.buildtime)
@property
def installtime(self):
return self._get_timestamp(self._pkg.installtime)
commands/upgrademinimal.py 0000644 00000003407 15051234446 0011725 0 ustar 00 #
# Copyright (C) 2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import absolute_import
from __future__ import unicode_literals
from dnf.i18n import _
from dnf.cli.commands.upgrade import UpgradeCommand
class UpgradeMinimalCommand(UpgradeCommand):
"""A class containing methods needed by the cli to execute the check
command.
"""
aliases = ('upgrade-minimal', 'update-minimal', 'up-min')
summary = _("upgrade, but only 'newest' package match which fixes a problem"
" that affects your system")
def configure(self):
UpgradeCommand.configure(self)
self.upgrade_minimal = True
if not any([self.opts.bugfix, self.opts.enhancement,
self.opts.newpackage, self.opts.security, self.opts.advisory,
self.opts.bugzilla, self.opts.cves, self.opts.severity]):
self.all_security = True
commands/shell.py 0000644 00000023154 15051234446 0010037 0 ustar 00 # shell.py
# Shell CLI command.
#
# Copyright (C) 2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from dnf.cli import commands
from dnf.i18n import _, ucd
import dnf.util
import cmd
import copy
import dnf
import logging
import shlex
import sys
logger = logging.getLogger('dnf')
# only demands we'd like to override
class ShellDemandSheet(object):
available_repos = True
resolving = True
root_user = True
sack_activation = True
class ShellCommand(commands.Command, cmd.Cmd):
aliases = ('shell', 'sh')
summary = _('run an interactive {prog} shell').format(prog=dnf.util.MAIN_PROG_UPPER)
MAPPING = {'repo': 'repo',
'repository': 'repo',
'exit': 'quit',
'quit': 'quit',
'run': 'ts_run',
'ts': 'transaction',
'transaction': 'transaction',
'config': 'config',
'resolvedep': 'resolve',
'help': 'help'
}
def __init__(self, cli):
commands.Command.__init__(self, cli)
cmd.Cmd.__init__(self)
self.prompt = '> '
@staticmethod
def set_argparser(parser):
parser.add_argument('script', nargs='?', metavar=_('SCRIPT'),
help=_('Script to run in {prog} shell').format(
prog=dnf.util.MAIN_PROG_UPPER))
def configure(self):
# append to ShellDemandSheet missing demands from
# dnf.cli.demand.DemandSheet with their default values.
default_demands = self.cli.demands
self.cli.demands = ShellDemandSheet()
for attr in dir(default_demands):
if attr.startswith('__'):
continue
try:
getattr(self.cli.demands, attr)
except AttributeError:
setattr(self.cli.demands, attr, getattr(default_demands, attr))
def run(self):
if self.opts.script:
self._run_script(self.opts.script)
else:
self.cmdloop()
def _clean(self):
self.base._finalize_base()
self.base._transaction = None
self.base.fill_sack()
def onecmd(self, line):
if not line or line == '\n':
return
if line == 'EOF':
line = 'quit'
try:
s_line = shlex.split(line)
except:
self._help()
return
# reset option parser before each command, keep usage information
self.cli.optparser.__init__(reset_usage=False)
opts = self.cli.optparser.parse_main_args(s_line)
# Disable shell recursion.
if opts.command == 'shell':
return
if opts.command in self.MAPPING:
getattr(self, '_' + self.MAPPING[opts.command])(s_line[1::])
else:
cmd_cls = self.cli.cli_commands.get(opts.command)
if cmd_cls is not None:
cmd = cmd_cls(self.cli)
try:
opts = self.cli.optparser.parse_command_args(cmd, s_line)
except SystemExit:
# argparse.ArgumentParser prints usage information and executes
# sys.exit() on problems with parsing command line arguments
return
try:
cmd.cli.demands = copy.deepcopy(self.cli.demands)
cmd.configure()
cmd.run()
except dnf.exceptions.Error as e:
logger.error(_("Error:") + " " + ucd(e))
return
else:
self._help()
def _config(self, args=None):
def print_or_set(key, val, conf):
if val:
setattr(conf, key, val)
else:
try:
print('{}: {}'.format(key, getattr(conf, str(key))))
except:
logger.warning(_('Unsupported key value.'))
if not args or len(args) > 2:
self._help('config')
return
key = args[0]
val = args[1] if len(args) == 2 else None
period = key.find('.')
if period != -1:
repo_name = key[:period]
key = key[period+1:]
repos = self.base.repos.get_matching(repo_name)
for repo in repos:
print_or_set(key, val, repo)
if not repos:
logger.warning(_('Could not find repository: %s'),
repo_name)
else:
print_or_set(key, val, self.base.conf)
def _help(self, args=None):
"""Output help information.
:param args: the command to output help information about. If
*args* is an empty, general help will be output.
"""
arg = args[0] if isinstance(args, list) and len(args) > 0 else args
msg = None
if arg:
if arg == 'config':
msg = _("""{} arg [value]
arg: debuglevel, errorlevel, obsoletes, gpgcheck, assumeyes, exclude,
repo_id.gpgcheck, repo_id.exclude
If no value is given it prints the current value.
If value is given it sets that value.""").format(arg)
elif arg == 'help':
msg = _("""{} [command]
print help""").format(arg)
elif arg in ['repo', 'repository']:
msg = _("""{} arg [option]
list: lists repositories and their status. option = [all | id | glob]
enable: enable repositories. option = repository id
disable: disable repositories. option = repository id""").format(arg)
elif arg == 'resolvedep':
msg = _("""{}
resolve the transaction set""").format(arg)
elif arg in ['transaction', 'ts']:
msg = _("""{} arg
list: lists the contents of the transaction
reset: reset (zero-out) the transaction
run: run the transaction""").format(arg)
elif arg == 'run':
msg = _("""{}
run the transaction""").format(arg)
elif arg in ['exit', 'quit']:
msg = _("""{}
exit the shell""").format(arg)
if not msg:
self.cli.optparser.print_help()
msg = _("""Shell specific arguments:
config set config options
help print help
repository (or repo) enable, disable or list repositories
resolvedep resolve the transaction set
transaction (or ts) list, reset or run the transaction set
run resolve and run the transaction set
exit (or quit) exit the shell""")
print('\n' + msg)
def _repo(self, args=None):
cmd = args[0] if args else None
if cmd in ['list', None]:
self.onecmd('repolist ' + ' '.join(args[1:]))
elif cmd in ['enable', 'disable']:
repos = self.cli.base.repos
fill_sack = False
for repo in args[1::]:
r = repos.get_matching(repo)
if r:
getattr(r, cmd)()
fill_sack = True
else:
logger.critical(_("Error:") + " " + _("Unknown repo: '%s'"),
self.base.output.term.bold(repo))
if fill_sack:
self.base.fill_sack()
# reset base._comps, as it has changed due to changing the repos
self.base._comps = None
else:
self._help('repo')
def _resolve(self, args=None):
try:
self.cli.base.resolve(self.cli.demands.allow_erasing)
except dnf.exceptions.DepsolveError as e:
print(e)
def _run_script(self, file):
try:
with open(file, 'r') as fd:
lines = fd.readlines()
for line in lines:
if not line.startswith('#'):
self.onecmd(line)
except IOError:
logger.info(_('Error: Cannot open %s for reading'), self.base.output.term.bold(file))
sys.exit(1)
def _transaction(self, args=None):
cmd = args[0] if args else None
if cmd == 'reset':
self._clean()
return
self._resolve()
if cmd in ['list', None]:
if self.base._transaction:
out = self.base.output.list_transaction(self.base._transaction)
logger.info(out)
elif cmd == 'run':
try:
self.base.do_transaction()
except dnf.exceptions.Error as e:
logger.error(_("Error:") + " " + ucd(e))
else:
logger.info(_("Complete!"))
self._clean()
else:
self._help('transaction')
def _ts_run(self, args=None):
self._transaction(['run'])
def _quit(self, args=None):
logger.info(_('Leaving Shell'))
sys.exit(0)
commands/reinstall.py 0000644 00000010135 15051234446 0010720 0 ustar 00 # reinstall.py
# Reinstall CLI command.
#
# Copyright (C) 2014-2016 Red Hat, Inc.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions of
# the GNU General Public License v.2, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY expressed or implied, including the implied warranties of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details. You should have received a copy of the
# GNU General Public License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the
# source code or documentation are not subject to the GNU General Public
# License and may only be used or replicated with the express permission of
# Red Hat, Inc.
#
from __future__ import absolute_import
from __future__ import unicode_literals
from dnf.cli import commands
from dnf.cli.option_parser import OptionParser
from dnf.i18n import _
import dnf.exceptions
import logging
logger = logging.getLogger('dnf')
class ReinstallCommand(commands.Command):
"""A class containing methods needed by the cli to execute the reinstall command.
"""
aliases = ('reinstall', 'rei')
summary = _('reinstall a package')
@staticmethod
def set_argparser(parser):
parser.add_argument('packages', nargs='+', help=_('Package to reinstall'),
action=OptionParser.ParseSpecGroupFileCallback,
metavar=_('PACKAGE'))
def configure(self):
"""Verify that conditions are met so that this command can
run. These include that the program is being run by the root
user, that there are enabled repositories with gpg keys, and
that this command is called with appropriate arguments.
"""
demands = self.cli.demands
demands.sack_activation = True
demands.available_repos = True
demands.resolving = True
demands.root_user = True
commands._checkGPGKey(self.base, self.cli)
if not self.opts.filenames:
commands._checkEnabledRepo(self.base)
def run(self):
# Reinstall files.
done = False
for pkg in self.base.add_remote_rpms(self.opts.filenames, strict=False,
progress=self.base.output.progress):
try:
self.base.package_reinstall(pkg)
except dnf.exceptions.MarkingError:
logger.info(_('No match for argument: %s'),
self.base.output.term.bold(pkg.location))
else:
done = True
# Reinstall packages.
for pkg_spec in self.opts.pkg_specs + ['@' + x for x in self.opts.grp_specs]:
try:
self.base.reinstall(pkg_spec)
except dnf.exceptions.PackagesNotInstalledError as err:
for pkg in err.packages:
logger.info(_('Package %s available, but not installed.'),
self.output.term.bold(pkg.name))
break
logger.info(_('No match for argument: %s'),
self.base.output.term.bold(pkg_spec))
except dnf.exceptions.PackagesNotAvailableError as err:
for pkg in err.packages:
xmsg = ''
pkgrepo = self.base.history.repo(pkg)
if pkgrepo:
xmsg = _(' (from %s)') % pkgrepo
msg = _('Installed package %s%s not available.')
logger.info(msg, self.base.output.term.bold(pkg),
xmsg)
except dnf.exceptions.MarkingError:
assert False, 'Only the above marking errors are expected.'
else:
done = True
if not done:
raise dnf.exceptions.Error(_('No packages marked for reinstall.'))