| Server IP : 170.10.162.208 / Your IP : 216.73.216.181 Web Server : LiteSpeed System : Linux altar19.supremepanel19.com 4.18.0-553.69.1.lve.el8.x86_64 #1 SMP Wed Aug 13 19:53:59 UTC 2025 x86_64 User : deltahospital ( 1806) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /tmp/ |
Upload File : |
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""helper backend"""
__all__ = [ "FirewallHelper" ]
from firewall import errors
from firewall.errors import FirewallError
class FirewallHelper(object):
def __init__(self, fw):
self._fw = fw
self._helpers = { }
def __repr__(self):
return '%s(%r)' % (self.__class__, self._helpers)
# helpers
def cleanup(self):
self._helpers.clear()
def check_helper(self, name):
if name not in self.get_helpers():
raise FirewallError(errors.INVALID_HELPER, name)
def query_helper(self, name):
return name in self.get_helpers()
def get_helpers(self):
return sorted(self._helpers.keys())
def has_helpers(self):
return len(self._helpers) > 0
def get_helper(self, name):
self.check_helper(name)
return self._helpers[name]
def add_helper(self, obj):
self._helpers[obj.name] = obj
def remove_helper(self, name):
if name not in self._helpers:
raise FirewallError(errors.INVALID_HELPER, name)
del self._helpers[name]
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""The ipset command wrapper"""
__all__ = [ "ipset", "check_ipset_name", "remove_default_create_options" ]
import os.path
import ipaddress
from firewall import errors
from firewall.errors import FirewallError
from firewall.core.prog import runProg
from firewall.core.logger import log
from firewall.functions import tempFile, readfile
from firewall.config import COMMANDS
IPSET_MAXNAMELEN = 32
IPSET_TYPES = [
# bitmap and set types are currently not supported
# "bitmap:ip",
# "bitmap:ip,mac",
# "bitmap:port",
# "list:set",
"hash:ip",
"hash:ip,port",
"hash:ip,port,ip",
"hash:ip,port,net",
"hash:ip,mark",
"hash:net",
"hash:net,net",
"hash:net,port",
"hash:net,port,net",
"hash:net,iface",
"hash:mac",
]
IPSET_CREATE_OPTIONS = {
"family": "inet|inet6",
"hashsize": "value",
"maxelem": "value",
"timeout": "value in secs",
#"counters": None,
#"comment": None,
}
IPSET_DEFAULT_CREATE_OPTIONS = {
"family": "inet",
"hashsize": "1024",
"maxelem": "65536",
}
class ipset(object):
"""ipset command wrapper class"""
def __init__(self):
self._command = COMMANDS["ipset"]
self.name = "ipset"
def __run(self, args):
"""Call ipset with args"""
# convert to string list
_args = ["%s" % item for item in args]
log.debug2("%s: %s %s", self.__class__, self._command, " ".join(_args))
(status, ret) = runProg(self._command, _args)
if status != 0:
raise ValueError("'%s %s' failed: %s" % (self._command,
" ".join(_args), ret))
return ret
def check_name(self, name):
"""Check ipset name"""
if len(name) > IPSET_MAXNAMELEN:
raise FirewallError(errors.INVALID_NAME,
"ipset name '%s' is not valid" % name)
def set_supported_types(self):
"""Return types that are supported by the ipset command and kernel"""
ret = [ ]
output = ""
try:
output = self.__run(["--help"])
except ValueError as ex:
log.debug1("ipset error: %s" % ex)
lines = output.splitlines()
in_types = False
for line in lines:
#print(line)
if in_types:
splits = line.strip().split(None, 2)
if splits[0] not in ret and splits[0] in IPSET_TYPES:
ret.append(splits[0])
if line.startswith("Supported set types:"):
in_types = True
return ret
def check_type(self, type_name):
"""Check ipset type"""
if len(type_name) > IPSET_MAXNAMELEN or type_name not in IPSET_TYPES:
raise FirewallError(errors.INVALID_TYPE,
"ipset type name '%s' is not valid" % type_name)
def set_create(self, set_name, type_name, options=None):
"""Create an ipset with name, type and options"""
self.check_name(set_name)
self.check_type(type_name)
args = [ "create", set_name, type_name ]
if isinstance(options, dict):
for key, val in options.items():
args.append(key)
if val != "":
args.append(val)
return self.__run(args)
def set_destroy(self, set_name):
self.check_name(set_name)
return self.__run([ "destroy", set_name ])
def set_add(self, set_name, entry):
args = [ "add", set_name, entry ]
return self.__run(args)
def set_delete(self, set_name, entry):
args = [ "del", set_name, entry ]
return self.__run(args)
def test(self, set_name, entry, options=None):
args = [ "test", set_name, entry ]
if options:
args.append("%s" % " ".join(options))
return self.__run(args)
def set_list(self, set_name=None, options=None):
args = [ "list" ]
if set_name:
args.append(set_name)
if options:
args.extend(options)
return self.__run(args).split("\n")
def set_get_active_terse(self):
""" Get active ipsets (only headers) """
lines = self.set_list(options=["-terse"])
ret = { }
_name = _type = None
_options = { }
for line in lines:
if len(line) < 1:
continue
pair = [ x.strip() for x in line.split(":", 1) ]
if len(pair) != 2:
continue
elif pair[0] == "Name":
_name = pair[1]
elif pair[0] == "Type":
_type = pair[1]
elif pair[0] == "Header":
splits = pair[1].split()
i = 0
while i < len(splits):
opt = splits[i]
if opt in [ "family", "hashsize", "maxelem", "timeout",
"netmask" ]:
if len(splits) > i:
i += 1
_options[opt] = splits[i]
else:
log.error("Malformed ipset list -terse output: %s",
line)
return { }
i += 1
if _name and _type:
ret[_name] = (_type,
remove_default_create_options(_options))
_name = _type = None
_options.clear()
return ret
def save(self, set_name=None):
args = [ "save" ]
if set_name:
args.append(set_name)
return self.__run(args)
def set_restore(self, set_name, type_name, entries,
create_options=None, entry_options=None):
self.check_name(set_name)
self.check_type(type_name)
temp_file = tempFile()
if ' ' in set_name:
set_name = "'%s'" % set_name
args = [ "create", set_name, type_name, "-exist" ]
if create_options:
for key, val in create_options.items():
args.append(key)
if val != "":
args.append(val)
temp_file.write("%s\n" % " ".join(args))
temp_file.write("flush %s\n" % set_name)
for entry in entries:
if ' ' in entry:
entry = "'%s'" % entry
if entry_options:
temp_file.write("add %s %s %s\n" % \
(set_name, entry, " ".join(entry_options)))
else:
temp_file.write("add %s %s\n" % (set_name, entry))
temp_file.close()
stat = os.stat(temp_file.name)
log.debug2("%s: %s restore %s", self.__class__, self._command,
"%s: %d" % (temp_file.name, stat.st_size))
args = [ "restore" ]
(status, ret) = runProg(self._command, args,
stdin=temp_file.name)
if log.getDebugLogLevel() > 2:
try:
readfile(temp_file.name)
except Exception:
pass
else:
i = 1
for line in readfile(temp_file.name):
log.debug3("%8d: %s" % (i, line), nofmt=1, nl=0)
if not line.endswith("\n"):
log.debug3("", nofmt=1)
i += 1
os.unlink(temp_file.name)
if status != 0:
raise ValueError("'%s %s' failed: %s" % (self._command,
" ".join(args), ret))
return ret
def set_flush(self, set_name):
args = [ "flush" ]
if set_name:
args.append(set_name)
return self.__run(args)
def rename(self, old_set_name, new_set_name):
return self.__run([ "rename", old_set_name, new_set_name ])
def swap(self, set_name_1, set_name_2):
return self.__run([ "swap", set_name_1, set_name_2 ])
def version(self):
return self.__run([ "version" ])
def check_ipset_name(name):
"""Return true if ipset name is valid"""
if len(name) > IPSET_MAXNAMELEN:
return False
return True
def remove_default_create_options(options):
""" Return only non default create options """
_options = options.copy()
for opt in IPSET_DEFAULT_CREATE_OPTIONS:
if opt in _options and \
IPSET_DEFAULT_CREATE_OPTIONS[opt] == _options[opt]:
del _options[opt]
return _options
def normalize_ipset_entry(entry):
""" Normalize IP addresses in entry """
_entry = []
for _part in entry.split(","):
try:
_part.index("/")
_entry.append(str(ipaddress.ip_network(_part, strict=False)))
except ValueError:
_entry.append(_part)
return ",".join(_entry)
def check_entry_overlaps_existing(entry, entries):
""" Check if entry overlaps any entry in the list of entries """
# Only check simple types
if len(entry.split(",")) > 1:
return
try:
entry_network = ipaddress.ip_network(entry, strict=False)
except ValueError:
# could not parse the new IP address, maybe a MAC
return
for itr in entries:
if entry_network.overlaps(ipaddress.ip_network(itr, strict=False)):
raise FirewallError(errors.INVALID_ENTRY, "Entry '{}' overlaps with existing entry '{}'".format(entry, itr))
def check_for_overlapping_entries(entries):
""" Check if any entry overlaps any entry in the list of entries """
try:
entries = [ipaddress.ip_network(x, strict=False) for x in entries]
except ValueError:
# at least one entry can not be parsed
return
if len(entries) == 0:
return
# We can take advantage of some facts of IPv4Network/IPv6Network and
# how Python sorts the networks to quickly detect overlaps.
#
# Facts:
#
# 1. IPv{4,6}Network are normalized to remove host bits, e.g.
# 10.1.1.0/16 will become 10.1.0.0/16.
#
# 2. IPv{4,6}Network objects are sorted by:
# a. IP address (network bits)
# then
# b. netmask (significant bits count)
#
# Because of the above we have these properties:
#
# 1. big networks (netA) are sorted before smaller networks (netB)
# that overlap the big network (netA)
# - e.g. 10.1.128.0/17 (netA) sorts before 10.1.129.0/24 (netB)
# 2. same value addresses (network bits) are grouped together even
# if the number of network bits vary. e.g. /16 vs /24
# - recall that address are normalized to remove host bits
# - e.g. 10.1.128.0/17 (netA) sorts before 10.1.128.0/24 (netC)
# 3. non-overlapping networks (netD, netE) are always sorted before or
# after networks that overlap (netB, netC) the current one (netA)
# - e.g. 10.1.128.0/17 (netA) sorts before 10.2.128.0/16 (netD)
# - e.g. 10.1.128.0/17 (netA) sorts after 9.1.128.0/17 (netE)
# - e.g. 9.1.128.0/17 (netE) sorts before 10.1.129.0/24 (netB)
#
# With this we know the sorted list looks like:
#
# list: [ netE, netA, netB, netC, netD ]
#
# netE = non-overlapping network
# netA = big network
# netB = smaller network that overlaps netA (subnet)
# netC = smaller network that overlaps netA (subnet)
# netD = non-overlapping network
#
# If networks netB and netC exist in the list, they overlap and are
# adjacent to netA.
#
# Checking for overlaps on a sorted list is thus:
#
# 1. compare adjacent elements in the list for overlaps
#
# Recall that we only need to detect a single overlap. We do not need to
# detect them all.
#
entries.sort()
prev_network = entries.pop(0)
for current_network in entries:
if prev_network.overlaps(current_network):
raise FirewallError(errors.INVALID_ENTRY, "Entry '{}' overlaps entry '{}'".format(prev_network, current_network))
prev_network = current_network
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Functions for NetworkManager interaction"""
__all__ = [ "check_nm_imported", "nm_is_imported",
"nm_get_zone_of_connection", "nm_set_zone_of_connection",
"nm_get_connections", "nm_get_connection_of_interface",
"nm_get_bus_name", "nm_get_dbus_interface" ]
import gi
from gi.repository import GLib
try:
gi.require_version('NM', '1.0')
except ValueError:
_nm_imported = False
else:
try:
from gi.repository import NM
_nm_imported = True
except (ImportError, ValueError, GLib.Error):
_nm_imported = False
_nm_client = None
from firewall import errors
from firewall.errors import FirewallError
from firewall.core.logger import log
import dbus
def check_nm_imported():
"""Check function to raise a MISSING_IMPORT error if the import of NM failed
"""
if not _nm_imported:
raise FirewallError(errors.MISSING_IMPORT, "gi.repository.NM = 1.0")
def nm_is_imported():
"""Returns true if NM has been properly imported
@return True if import was successful, False otherwirse
"""
return _nm_imported
def nm_get_client():
"""Returns the NM client object or None if the import of NM failed
@return NM.Client instance if import was successful, None otherwise
"""
global _nm_client
if not _nm_client:
_nm_client = NM.Client.new(None)
return _nm_client
def nm_get_zone_of_connection(connection):
"""Get zone of connection from NM
@param connection name
@return zone string setting of connection, empty string if not set, None if connection is unknown
"""
check_nm_imported()
con = nm_get_client().get_connection_by_uuid(connection)
if con is None:
return None
setting_con = con.get_setting_connection()
if setting_con is None:
return None
try:
if con.get_flags() & (NM.SettingsConnectionFlags.NM_GENERATED
| NM.SettingsConnectionFlags.NM_VOLATILE):
return ""
except AttributeError:
# Prior to NetworkManager 1.12, we can only guess
# that a connection was generated/volatile.
if con.get_unsaved():
return ""
zone = setting_con.get_zone()
if zone is None:
zone = ""
return zone
def nm_set_zone_of_connection(zone, connection):
"""Set the zone for a connection
@param zone name
@param connection name
@return True if zone was set, else False
"""
check_nm_imported()
con = nm_get_client().get_connection_by_uuid(connection)
if con is None:
return False
setting_con = con.get_setting_connection()
if setting_con is None:
return False
if zone == "":
zone = None
setting_con.set_property("zone", zone)
return con.commit_changes(True, None)
def nm_get_connections(connections, connections_name):
"""Get active connections from NM
@param connections return dict
@param connections_name return dict
"""
connections.clear()
connections_name.clear()
check_nm_imported()
active_connections = nm_get_client().get_active_connections()
for active_con in active_connections:
# ignore vpn devices for now
if active_con.get_vpn():
continue
name = active_con.get_id()
uuid = active_con.get_uuid()
devices = active_con.get_devices()
connections_name[uuid] = name
for dev in devices:
ip_iface = dev.get_ip_iface()
if ip_iface:
connections[ip_iface] = uuid
def nm_get_interfaces():
"""Get active interfaces from NM
@returns list of interface names
"""
check_nm_imported()
active_interfaces = []
for active_con in nm_get_client().get_active_connections():
# ignore vpn devices for now
if active_con.get_vpn():
continue
try:
con = active_con.get_connection()
if con.get_flags() & (NM.SettingsConnectionFlags.NM_GENERATED
| NM.SettingsConnectionFlags.NM_VOLATILE):
continue
except AttributeError:
# Prior to NetworkManager 1.12, we can only guess
# that a connection was generated/volatile.
if con.get_unsaved():
continue
for dev in active_con.get_devices():
ip_iface = dev.get_ip_iface()
if ip_iface:
active_interfaces.append(ip_iface)
return active_interfaces
def nm_get_interfaces_in_zone(zone):
interfaces = []
for interface in nm_get_interfaces():
conn = nm_get_connection_of_interface(interface)
if zone == nm_get_zone_of_connection(conn):
interfaces.append(interface)
return interfaces
def nm_get_device_by_ip_iface(interface):
"""Get device from NM which has the given IP interface
@param interface name
@returns NM.Device instance or None
"""
check_nm_imported()
for device in nm_get_client().get_devices():
ip_iface = device.get_ip_iface()
if ip_iface is None:
continue
if ip_iface == interface:
return device
return None
def nm_get_connection_of_interface(interface):
"""Get connection from NM that is using the interface
@param interface name
@returns connection that is using interface or None
"""
check_nm_imported()
device = nm_get_device_by_ip_iface(interface)
if device is None:
return None
active_con = device.get_active_connection()
if active_con is None:
return None
try:
con = active_con.get_connection()
if con.get_flags() & NM.SettingsConnectionFlags.NM_GENERATED:
return None
except AttributeError:
# Prior to NetworkManager 1.12, we can only guess
# that a connection was generated.
if con.get_unsaved():
return None
return active_con.get_uuid()
def nm_get_bus_name():
if not _nm_imported:
return None
try:
bus = dbus.SystemBus()
obj = bus.get_object(NM.DBUS_INTERFACE, NM.DBUS_PATH)
name = obj.bus_name
del obj, bus
return name
except Exception:
log.debug2("Failed to get bus name of NetworkManager")
return None
def nm_get_dbus_interface():
if not _nm_imported:
return ""
return NM.DBUS_INTERFACE
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__all__ = [ "FirewallDirect" ]
from firewall.fw_types import LastUpdatedOrderedDict
from firewall.core import ipXtables
from firewall.core import ebtables
from firewall.core.fw_transaction import FirewallTransaction
from firewall.core.logger import log
from firewall import errors
from firewall.errors import FirewallError
############################################################################
#
# class Firewall
#
############################################################################
class FirewallDirect(object):
def __init__(self, fw):
self._fw = fw
self.__init_vars()
def __repr__(self):
return '%s(%r, %r, %r)' % (self.__class__, self._chains, self._rules,
self._rule_priority_positions)
def __init_vars(self):
self._chains = { }
self._rules = { }
self._rule_priority_positions = { }
self._passthroughs = { }
self._obj = None
def cleanup(self):
self.__init_vars()
# transaction
def new_transaction(self):
return FirewallTransaction(self._fw)
# configuration
def set_permanent_config(self, obj):
self._obj = obj
def has_runtime_configuration(self):
if len(self._chains) + len(self._rules) + len(self._passthroughs) > 0:
return True
return False
def has_configuration(self):
if self.has_runtime_configuration():
return True
if len(self._obj.get_all_chains()) + \
len(self._obj.get_all_rules()) + \
len(self._obj.get_all_passthroughs()) > 0:
return True
return False
def apply_direct(self, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
# Apply permanent configuration and save the obj to be able to
# remove permanent configuration settings within get_runtime_config
# for use in firewalld reload.
self.set_config((self._obj.get_all_chains(),
self._obj.get_all_rules(),
self._obj.get_all_passthroughs()),
transaction)
if use_transaction is None:
transaction.execute(True)
def get_runtime_config(self):
# Return only runtime changes
# Remove all chains, rules and passthroughs that are in self._obj
# (permanent config applied in firewalld _start.
chains = { }
rules = { }
passthroughs = { }
for table_id in self._chains:
(ipv, table) = table_id
for chain in self._chains[table_id]:
if not self._obj.query_chain(ipv, table, chain):
chains.setdefault(table_id, [ ]).append(chain)
for chain_id in self._rules:
(ipv, table, chain) = chain_id
for (priority, args) in self._rules[chain_id]:
if not self._obj.query_rule(ipv, table, chain, priority, args):
if chain_id not in rules:
rules[chain_id] = LastUpdatedOrderedDict()
rules[chain_id][(priority, args)] = priority
for ipv in self._passthroughs:
for args in self._passthroughs[ipv]:
if not self._obj.query_passthrough(ipv, args):
if ipv not in passthroughs:
passthroughs[ipv] = [ ]
passthroughs[ipv].append(args)
return (chains, rules, passthroughs)
def get_config(self):
return (self._chains, self._rules, self._passthroughs)
def set_config(self, conf, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
(_chains, _rules, _passthroughs) = conf
for table_id in _chains:
(ipv, table) = table_id
for chain in _chains[table_id]:
if not self.query_chain(ipv, table, chain):
try:
self.add_chain(ipv, table, chain,
use_transaction=transaction)
except FirewallError as error:
log.warning(str(error))
for chain_id in _rules:
(ipv, table, chain) = chain_id
for (priority, args) in _rules[chain_id]:
if not self.query_rule(ipv, table, chain, priority, args):
try:
self.add_rule(ipv, table, chain, priority, args,
use_transaction=transaction)
except FirewallError as error:
log.warning(str(error))
for ipv in _passthroughs:
for args in _passthroughs[ipv]:
if not self.query_passthrough(ipv, args):
try:
self.add_passthrough(ipv, args,
use_transaction=transaction)
except FirewallError as error:
log.warning(str(error))
if use_transaction is None:
transaction.execute(True)
def _check_ipv(self, ipv):
ipvs = ['ipv4', 'ipv6', 'eb']
if ipv not in ipvs:
raise FirewallError(errors.INVALID_IPV,
"'%s' not in '%s'" % (ipv, ipvs))
def _check_ipv_table(self, ipv, table):
self._check_ipv(ipv)
tables = ipXtables.BUILT_IN_CHAINS.keys() if ipv in [ 'ipv4', 'ipv6' ] \
else ebtables.BUILT_IN_CHAINS.keys()
if table not in tables:
raise FirewallError(errors.INVALID_TABLE,
"'%s' not in '%s'" % (table, tables))
def _check_builtin_chain(self, ipv, table, chain):
if ipv in ['ipv4', 'ipv6']:
built_in_chains = ipXtables.BUILT_IN_CHAINS[table]
if self._fw.nftables_enabled:
our_chains = {}
else:
our_chains = self._fw.get_direct_backend_by_ipv(ipv).our_chains[table]
else:
built_in_chains = ebtables.BUILT_IN_CHAINS[table]
our_chains = ebtables.OUR_CHAINS[table]
if chain in built_in_chains:
raise FirewallError(errors.BUILTIN_CHAIN,
"chain '%s' is built-in chain" % chain)
if chain in our_chains:
raise FirewallError(errors.BUILTIN_CHAIN,
"chain '%s' is reserved" % chain)
if ipv in [ "ipv4", "ipv6" ]:
if self._fw.zone.zone_from_chain(chain) is not None:
raise FirewallError(errors.INVALID_CHAIN,
"Chain '%s' is reserved" % chain)
def _register_chain(self, table_id, chain, add):
if add:
self._chains.setdefault(table_id, [ ]).append(chain)
else:
self._chains[table_id].remove(chain)
if len(self._chains[table_id]) == 0:
del self._chains[table_id]
def add_chain(self, ipv, table, chain, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
if self._fw.may_skip_flush_direct_backends():
transaction.add_pre(self._fw.flush_direct_backends)
#TODO: policy="ACCEPT"
self._chain(True, ipv, table, chain, transaction)
if use_transaction is None:
transaction.execute(True)
def remove_chain(self, ipv, table, chain, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
self._chain(False, ipv, table, chain, transaction)
if use_transaction is None:
transaction.execute(True)
def query_chain(self, ipv, table, chain):
self._check_ipv_table(ipv, table)
self._check_builtin_chain(ipv, table, chain)
table_id = (ipv, table)
return (table_id in self._chains and
chain in self._chains[table_id])
def get_chains(self, ipv, table):
self._check_ipv_table(ipv, table)
table_id = (ipv, table)
if table_id in self._chains:
return self._chains[table_id]
return [ ]
def get_all_chains(self):
r = [ ]
for key in self._chains:
(ipv, table) = key
for chain in self._chains[key]:
r.append((ipv, table, chain))
return r
def add_rule(self, ipv, table, chain, priority, args, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
if self._fw.may_skip_flush_direct_backends():
transaction.add_pre(self._fw.flush_direct_backends)
self._rule(True, ipv, table, chain, priority, args, transaction)
if use_transaction is None:
transaction.execute(True)
def remove_rule(self, ipv, table, chain, priority, args,
use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
self._rule(False, ipv, table, chain, priority, args, transaction)
if use_transaction is None:
transaction.execute(True)
def query_rule(self, ipv, table, chain, priority, args):
self._check_ipv_table(ipv, table)
chain_id = (ipv, table, chain)
return chain_id in self._rules and \
(priority, args) in self._rules[chain_id]
def get_rules(self, ipv, table, chain):
self._check_ipv_table(ipv, table)
chain_id = (ipv, table, chain)
if chain_id in self._rules:
return list(self._rules[chain_id].keys())
return [ ]
def get_all_rules(self):
r = [ ]
for key in self._rules:
(ipv, table, chain) = key
for (priority, args) in self._rules[key]:
r.append((ipv, table, chain, priority, list(args)))
return r
def _register_rule(self, rule_id, chain_id, priority, enable, count):
if enable:
if chain_id not in self._rules:
self._rules[chain_id] = LastUpdatedOrderedDict()
self._rules[chain_id][rule_id] = priority
if chain_id not in self._rule_priority_positions:
self._rule_priority_positions[chain_id] = { }
if priority in self._rule_priority_positions[chain_id]:
self._rule_priority_positions[chain_id][priority] += count
else:
self._rule_priority_positions[chain_id][priority] = count
else:
del self._rules[chain_id][rule_id]
if len(self._rules[chain_id]) == 0:
del self._rules[chain_id]
self._rule_priority_positions[chain_id][priority] -= count
# DIRECT PASSTHROUGH (untracked)
def passthrough(self, ipv, args):
try:
return self._fw.rule(self._fw.get_direct_backend_by_ipv(ipv).name, args)
except Exception as msg:
log.debug2(msg)
raise FirewallError(errors.COMMAND_FAILED, msg)
def _register_passthrough(self, ipv, args, enable):
if enable:
if ipv not in self._passthroughs:
self._passthroughs[ipv] = [ ]
self._passthroughs[ipv].append(args)
else:
self._passthroughs[ipv].remove(args)
if len(self._passthroughs[ipv]) == 0:
del self._passthroughs[ipv]
def add_passthrough(self, ipv, args, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
if self._fw.may_skip_flush_direct_backends():
transaction.add_pre(self._fw.flush_direct_backends)
self._passthrough(True, ipv, list(args), transaction)
if use_transaction is None:
transaction.execute(True)
def remove_passthrough(self, ipv, args, use_transaction=None):
if use_transaction is None:
transaction = self.new_transaction()
else:
transaction = use_transaction
self._passthrough(False, ipv, list(args), transaction)
if use_transaction is None:
transaction.execute(True)
def query_passthrough(self, ipv, args):
return ipv in self._passthroughs and \
tuple(args) in self._passthroughs[ipv]
def get_all_passthroughs(self):
r = [ ]
for ipv in self._passthroughs:
for args in self._passthroughs[ipv]:
r.append((ipv, list(args)))
return r
def get_passthroughs(self, ipv):
r = [ ]
if ipv in self._passthroughs:
for args in self._passthroughs[ipv]:
r.append(list(args))
return r
def split_value(self, rules, opts):
"""Split values combined with commas for options in opts"""
out_rules = [ ]
for rule in rules:
processed = False
for opt in opts:
try:
i = rule.index(opt)
except ValueError:
pass
else:
if len(rule) > i and "," in rule[i+1]:
# For all items in the comma separated list in index
# i of the rule, a new rule is created with a single
# item from this list
processed = True
items = rule[i+1].split(",")
for item in items:
_rule = rule[:]
_rule[i+1] = item
out_rules.append(_rule)
if not processed:
out_rules.append(rule)
return out_rules
def _rule(self, enable, ipv, table, chain, priority, args, transaction):
self._check_ipv_table(ipv, table)
# Do not create zone chains if we're using nftables. Only allow direct
# rules in the built in chains.
if not self._fw.nftables_enabled \
and ipv in [ "ipv4", "ipv6" ]:
self._fw.zone.create_zone_base_by_chain(ipv, table, chain,
transaction)
_chain = chain
backend = self._fw.get_direct_backend_by_ipv(ipv)
# if nftables is in use, just put the direct rules in the chain
# specified by the user. i.e. don't append _direct.
if not self._fw.nftables_enabled \
and backend.is_chain_builtin(ipv, table, chain):
_chain = "%s_direct" % (chain)
elif self._fw.nftables_enabled and chain[-7:] == "_direct" \
and backend.is_chain_builtin(ipv, table, chain[:-7]):
# strip _direct suffix. If we're using nftables we don't bother
# creating the *_direct chains for builtin chains.
_chain = chain[:-7]
chain_id = (ipv, table, chain)
rule_id = (priority, args)
if enable:
if chain_id in self._rules and \
rule_id in self._rules[chain_id]:
raise FirewallError(errors.ALREADY_ENABLED,
"rule '%s' already is in '%s:%s:%s'" % \
(args, ipv, table, chain))
else:
if chain_id not in self._rules or \
rule_id not in self._rules[chain_id]:
raise FirewallError(errors.NOT_ENABLED,
"rule '%s' is not in '%s:%s:%s'" % \
(args, ipv, table, chain))
# get priority of rule
priority = self._rules[chain_id][rule_id]
# If a rule gets added, the initial rule index position within the
# ipv, table and chain combination (chain_id) is 1.
# Tf the chain_id exists in _rule_priority_positions, there are already
# other rules for this chain_id. The number of rules for a priority
# less or equal to the priority of the new rule will increase the
# index of the new rule. The index is the ip*tables -I insert rule
# number.
#
# Example: We have the following rules for chain_id (ipv4, filter,
# INPUT) already:
# ipv4, filter, INPUT, 1, -i, foo1, -j, ACCEPT
# ipv4, filter, INPUT, 2, -i, foo2, -j, ACCEPT
# ipv4, filter, INPUT, 2, -i, foo2_1, -j, ACCEPT
# ipv4, filter, INPUT, 3, -i, foo3, -j, ACCEPT
# This results in the following _rule_priority_positions structure:
# _rule_priority_positions[(ipv4,filter,INPUT)][1] = 1
# _rule_priority_positions[(ipv4,filter,INPUT)][2] = 2
# _rule_priority_positions[(ipv4,filter,INPUT)][3] = 1
# The new rule
# ipv4, filter, INPUT, 2, -i, foo2_2, -j, ACCEPT
# has the same pritority as the second rule before and will be added
# right after it.
# The initial index is 1 and the chain_id is already in
# _rule_priority_positions. Therefore the index will increase for
# the number of rules in every rule position in
# _rule_priority_positions[(ipv4,filter,INPUT)].keys()
# where position is smaller or equal to the entry in keys.
# With the example from above:
# The priority of the new rule is 2. Therefore for all keys in
# _rule_priority_positions[chain_id] where priority is 1 or 2, the
# number of the rules will increase the index of the rule.
# For _rule_priority_positions[chain_id][1]: index += 1
# _rule_priority_positions[chain_id][2]: index += 2
# index will be 4 in the end and the rule in the table chain
# combination will be added at index 4.
# If there are no rules in the table chain combination, a new rule
# has index 1.
index = 1
count = 0
if chain_id in self._rule_priority_positions:
positions = sorted(self._rule_priority_positions[chain_id].keys())
j = 0
while j < len(positions) and priority >= positions[j]:
index += self._rule_priority_positions[chain_id][positions[j]]
j += 1
# split the direct rule in some cases as iptables-restore can't handle
# compound args.
#
args_list = [list(args)]
args_list = self.split_value(args_list, [ "-s", "--source" ])
args_list = self.split_value(args_list, [ "-d", "--destination" ])
for _args in args_list:
transaction.add_rule(backend, backend.build_rule(enable, table, _chain, index, tuple(_args)))
index += 1
count += 1
self._register_rule(rule_id, chain_id, priority, enable, count)
transaction.add_fail(self._register_rule,
rule_id, chain_id, priority, not enable, count)
def _chain(self, add, ipv, table, chain, transaction):
self._check_ipv_table(ipv, table)
self._check_builtin_chain(ipv, table, chain)
table_id = (ipv, table)
if add:
if table_id in self._chains and \
chain in self._chains[table_id]:
raise FirewallError(errors.ALREADY_ENABLED,
"chain '%s' already is in '%s:%s'" % \
(chain, ipv, table))
else:
if table_id not in self._chains or \
chain not in self._chains[table_id]:
raise FirewallError(errors.NOT_ENABLED,
"chain '%s' is not in '%s:%s'" % \
(chain, ipv, table))
backend = self._fw.get_direct_backend_by_ipv(ipv)
transaction.add_rules(backend, backend.build_chain_rules(add, table, chain))
self._register_chain(table_id, chain, add)
transaction.add_fail(self._register_chain, table_id, chain, not add)
def _passthrough(self, enable, ipv, args, transaction):
self._check_ipv(ipv)
tuple_args = tuple(args)
if enable:
if ipv in self._passthroughs and \
tuple_args in self._passthroughs[ipv]:
raise FirewallError(errors.ALREADY_ENABLED,
"passthrough '%s', '%s'" % (ipv, args))
else:
if ipv not in self._passthroughs or \
tuple_args not in self._passthroughs[ipv]:
raise FirewallError(errors.NOT_ENABLED,
"passthrough '%s', '%s'" % (ipv, args))
backend = self._fw.get_direct_backend_by_ipv(ipv)
if enable:
backend.check_passthrough(args)
# try to find out if a zone chain should be used
if ipv in [ "ipv4", "ipv6" ]:
table, chain = backend.passthrough_parse_table_chain(args)
if table and chain:
self._fw.zone.create_zone_base_by_chain(ipv, table, chain)
_args = args
else:
_args = backend.reverse_passthrough(args)
transaction.add_rule(backend, _args)
self._register_passthrough(ipv, tuple_args, enable)
transaction.add_fail(self._register_passthrough, ipv, tuple_args,
not enable)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2010-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import subprocess
__all__ = ["runProg"]
def runProg(prog, argv=None, stdin=None):
if argv is None:
argv = []
args = [prog] + argv
input_string = None
if stdin:
with open(stdin, 'r') as handle:
input_string = handle.read().encode()
env = {'LANG': 'C'}
try:
process = subprocess.Popen(args, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
close_fds=True, env=env)
except OSError:
return (255, '')
(output, err_output) = process.communicate(input_string)
output = output.decode('utf-8', 'replace')
return (process.returncode, output)
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""The helper maxnamelen"""
HELPER_MAXNAMELEN = 32
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""ipset backend"""
__all__ = [ "FirewallIPSet" ]
from firewall.core.logger import log
from firewall.core.ipset import remove_default_create_options as rm_def_cr_opts, \
normalize_ipset_entry, check_entry_overlaps_existing, \
check_for_overlapping_entries
from firewall.core.io.ipset import IPSet
from firewall import errors
from firewall.errors import FirewallError
class FirewallIPSet(object):
def __init__(self, fw):
self._fw = fw
self._ipsets = { }
def __repr__(self):
return '%s(%r)' % (self.__class__, self._ipsets)
# ipsets
def cleanup(self):
self._ipsets.clear()
def check_ipset(self, name):
if name not in self.get_ipsets():
raise FirewallError(errors.INVALID_IPSET, name)
def query_ipset(self, name):
return name in self.get_ipsets()
def get_ipsets(self):
return sorted(self._ipsets.keys())
def has_ipsets(self):
return len(self._ipsets) > 0
def get_ipset(self, name, applied=False):
self.check_ipset(name)
obj = self._ipsets[name]
if applied:
self.check_applied_obj(obj)
return obj
def backends(self):
backends = []
if self._fw.nftables_enabled:
backends.append(self._fw.nftables_backend)
if self._fw.ipset_enabled:
backends.append(self._fw.ipset_backend)
return backends
def add_ipset(self, obj):
if obj.type not in self._fw.ipset_supported_types:
raise FirewallError(errors.INVALID_TYPE,
"'%s' is not supported by ipset." % obj.type)
self._ipsets[obj.name] = obj
def remove_ipset(self, name, keep=False):
obj = self._ipsets[name]
if obj.applied and not keep:
try:
for backend in self.backends():
backend.set_destroy(name)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
log.debug1("Keeping ipset '%s' because of timeout option", name)
del self._ipsets[name]
def apply_ipset(self, name):
obj = self._ipsets[name]
for backend in self.backends():
if backend.name == "ipset":
active = backend.set_get_active_terse()
if name in active and ("timeout" not in obj.options or \
obj.options["timeout"] == "0" or \
obj.type != active[name][0] or \
rm_def_cr_opts(obj.options) != \
active[name][1]):
try:
backend.set_destroy(name)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
if self._fw._individual_calls:
try:
backend.set_create(obj.name, obj.type, obj.options)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
obj.applied = True
if "timeout" in obj.options and \
obj.options["timeout"] != "0":
# no entries visible for ipsets with timeout
continue
try:
backend.set_flush(obj.name)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
for entry in obj.entries:
try:
backend.set_add(obj.name, entry)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
try:
backend.set_restore(obj.name, obj.type,
obj.entries, obj.options,
None)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
obj.applied = True
def apply_ipsets(self):
for name in self.get_ipsets():
obj = self._ipsets[name]
obj.applied = False
log.debug1("Applying ipset '%s'" % name)
self.apply_ipset(name)
def flush(self):
for backend in self.backends():
# nftables sets are part of the normal firewall ruleset.
if backend.name == "nftables":
continue
for ipset in self.get_ipsets():
try:
self.check_applied(ipset)
backend.set_destroy(ipset)
except FirewallError as msg:
if msg.code != errors.NOT_APPLIED:
raise msg
# TYPE
def get_type(self, name, applied=True):
return self.get_ipset(name, applied=applied).type
# DIMENSION
def get_dimension(self, name):
return len(self.get_ipset(name, applied=True).type.split(","))
def check_applied(self, name):
obj = self.get_ipset(name)
self.check_applied_obj(obj)
def check_applied_obj(self, obj):
if not obj.applied:
raise FirewallError(
errors.NOT_APPLIED, obj.name)
# OPTIONS
def get_family(self, name, applied=True):
obj = self.get_ipset(name, applied=applied)
if "family" in obj.options:
if obj.options["family"] == "inet6":
return "ipv6"
return "ipv4"
# ENTRIES
def add_entry(self, name, entry):
obj = self.get_ipset(name, applied=True)
entry = normalize_ipset_entry(entry)
IPSet.check_entry(entry, obj.options, obj.type)
if entry in obj.entries:
raise FirewallError(errors.ALREADY_ENABLED,
"'%s' already is in '%s'" % (entry, name))
check_entry_overlaps_existing(entry, obj.entries)
try:
for backend in self.backends():
backend.set_add(obj.name, entry)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
if "timeout" not in obj.options or obj.options["timeout"] == "0":
# no entries visible for ipsets with timeout
obj.entries.append(entry)
def remove_entry(self, name, entry):
obj = self.get_ipset(name, applied=True)
entry = normalize_ipset_entry(entry)
# no entry check for removal
if entry not in obj.entries:
raise FirewallError(errors.NOT_ENABLED,
"'%s' not in '%s'" % (entry, name))
try:
for backend in self.backends():
backend.set_delete(obj.name, entry)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
if "timeout" not in obj.options or obj.options["timeout"] == "0":
# no entries visible for ipsets with timeout
obj.entries.remove(entry)
def query_entry(self, name, entry):
obj = self.get_ipset(name, applied=True)
entry = normalize_ipset_entry(entry)
if "timeout" in obj.options and obj.options["timeout"] != "0":
# no entries visible for ipsets with timeout
raise FirewallError(errors.IPSET_WITH_TIMEOUT, name)
return entry in obj.entries
def get_entries(self, name):
obj = self.get_ipset(name, applied=True)
return obj.entries
def set_entries(self, name, entries):
obj = self.get_ipset(name, applied=True)
check_for_overlapping_entries(entries)
for entry in entries:
IPSet.check_entry(entry, obj.options, obj.type)
if "timeout" not in obj.options or obj.options["timeout"] == "0":
# no entries visible for ipsets with timeout
obj.entries = entries
try:
for backend in self.backends():
backend.set_flush(obj.name)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
obj.applied = True
try:
for backend in self.backends():
if self._fw._individual_calls:
for entry in obj.entries:
backend.set_add(obj.name, entry)
else:
backend.set_restore(obj.name, obj.type, obj.entries,
obj.options, None)
except Exception as msg:
raise FirewallError(errors.COMMAND_FAILED, msg)
else:
obj.applied = True
return
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Transaction classes for firewalld"""
__all__ = [ "FirewallTransaction" ]
import traceback
from firewall.core.logger import log
from firewall import errors
from firewall.errors import FirewallError
class FirewallTransaction(object):
def __init__(self, fw):
self.fw = fw
self.rules = { } # [ ( backend.name, [ rule,.. ] ),.. ]
self.pre_funcs = [ ] # [ (func, args),.. ]
self.post_funcs = [ ] # [ (func, args),.. ]
self.fail_funcs = [ ] # [ (func, args),.. ]
self.modules = [ ] # [ module,.. ]
def clear(self):
self.rules.clear()
del self.pre_funcs[:]
del self.post_funcs[:]
del self.fail_funcs[:]
def add_rule(self, backend, rule):
self.rules.setdefault(backend.name, [ ]).append(rule)
def add_rules(self, backend, rules):
for rule in rules:
self.add_rule(backend, rule)
def query_rule(self, backend, rule):
return backend.name in self.rules and rule in self.rules[backend.name]
def remove_rule(self, backend, rule):
if backend.name in self.rules and rule in self.rules[backend.name]:
self.rules[backend.name].remove(rule)
def add_pre(self, func, *args):
self.pre_funcs.append((func, args))
def add_post(self, func, *args):
self.post_funcs.append((func, args))
def add_fail(self, func, *args):
self.fail_funcs.append((func, args))
def add_module(self, module):
if module not in self.modules:
self.modules.append(module)
def remove_module(self, module):
if module in self.modules:
self.modules.remove(module)
def add_modules(self, modules):
for module in modules:
self.add_module(module)
def remove_modules(self, modules):
for module in modules:
self.remove_module(module)
def prepare(self, enable):
log.debug4("%s.prepare(%s, %s)" % (type(self), enable, "..."))
rules = { }
if not enable:
# reverse rule order for cleanup
for backend_name in self.rules:
for rule in reversed(self.rules[backend_name]):
rules.setdefault(backend_name, [ ]).append(
self.fw.get_backend_by_name(backend_name).reverse_rule(rule))
else:
for backend_name in self.rules:
rules.setdefault(backend_name, [ ]).extend(self.rules[backend_name])
return rules, self.modules
def execute(self, enable):
log.debug4("%s.execute(%s)" % (type(self), enable))
rules, modules = self.prepare(enable)
# pre
self.pre()
# stage 1: apply rules
error = False
errorMsg = ""
done = [ ]
for backend_name in rules:
try:
self.fw.rules(backend_name, rules[backend_name])
except Exception as msg:
error = True
errorMsg = msg
log.debug1(traceback.format_exc())
log.error(msg)
else:
done.append(backend_name)
# stage 2: load modules
if not error:
module_return = self.fw.handle_modules(modules, enable)
if module_return:
# Debug log about issues loading modules, but don't error. The
# modules may be builtin or CONFIG_MODULES=n, in which case
# modprobe will fail. Or we may be running inside a container
# that doesn't have sufficient privileges. Unfortunately there
# is no way for us to know.
(status, msg) = module_return
if status:
log.debug1(msg)
# error case: revert rules
if error:
undo_rules = { }
for backend_name in done:
undo_rules[backend_name] = [ ]
for rule in reversed(rules[backend_name]):
undo_rules[backend_name].append(
self.fw.get_backend_by_name(backend_name).reverse_rule(rule))
for backend_name in undo_rules:
try:
self.fw.rules(backend_name, undo_rules[backend_name])
except Exception as msg:
log.debug1(traceback.format_exc())
log.error(msg)
# call failure functions
for (func, args) in self.fail_funcs:
try:
func(*args)
except Exception as msg:
log.debug1(traceback.format_exc())
log.error("Calling fail func %s(%s) failed: %s" % \
(func, args, msg))
raise FirewallError(errors.COMMAND_FAILED, errorMsg)
# post
self.post()
def pre(self):
log.debug4("%s.pre()" % type(self))
for (func, args) in self.pre_funcs:
try:
func(*args)
except Exception as msg:
log.debug1(traceback.format_exc())
log.error("Calling pre func %s(%s) failed: %s" % \
(func, args, msg))
def post(self):
log.debug4("%s.post()" % type(self))
for (func, args) in self.post_funcs:
try:
func(*args)
except Exception as msg:
log.debug1(traceback.format_exc())
log.error("Calling post func %s(%s) failed: %s" % \
(func, args, msg))
# -*- coding: utf-8 -*-
#
# Copyright (C) 2018 Red Hat, Inc.
#
# Authors:
# Eric Garver <e@erig.me>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import absolute_import
import copy
import json
import ipaddress
from firewall.core.logger import log
from firewall.functions import check_mac, getPortRange, normalizeIP6, \
check_single_address, check_address
from firewall.errors import FirewallError, UNKNOWN_ERROR, INVALID_RULE, \
INVALID_ICMPTYPE, INVALID_TYPE, INVALID_ENTRY, \
INVALID_PORT
from firewall.core.rich import Rich_Accept, Rich_Reject, Rich_Drop, Rich_Mark, \
Rich_Masquerade, Rich_ForwardPort, Rich_IcmpBlock
from nftables.nftables import Nftables
TABLE_NAME = "firewalld"
TABLE_NAME_POLICY = TABLE_NAME + "_" + "policy_drop"
POLICY_CHAIN_PREFIX = "policy_"
# Map iptables (table, chain) to hooks and priorities.
# These are well defined by NF_IP_PRI_* defines in netfilter.
#
# This is analogous to ipXtables.BUILT_IN_CHAINS, but we omit the chains that
# are only used for direct rules.
#
# Note: All hooks use their standard position + NFT_HOOK_OFFSET. This means
# iptables will have DROP precedence. It also means that even if iptables
# ACCEPTs a packet it may still be dropped later by firewalld's rules.
#
NFT_HOOK_OFFSET = 10
IPTABLES_TO_NFT_HOOK = {
#"security": {
# "INPUT": ("input", 50 + NFT_HOOK_OFFSET),
# "OUTPUT": ("output", 50 + NFT_HOOK_OFFSET),
# "FORWARD": ("forward", 50 + NFT_HOOK_OFFSET),
#},
"raw": {
# "PREROUTING": ("prerouting", -300 + NFT_HOOK_OFFSET),
# "OUTPUT": ("output", -300 + NFT_HOOK_OFFSET),
},
"mangle": {
"PREROUTING": ("prerouting", -150 + NFT_HOOK_OFFSET),
# "POSTROUTING": ("postrouting", -150 + NFT_HOOK_OFFSET),
# "INPUT": ("input", -150 + NFT_HOOK_OFFSET),
# "OUTPUT": ("output", -150 + NFT_HOOK_OFFSET),
# "FORWARD": ("forward", -150 + NFT_HOOK_OFFSET),
},
"nat": {
"PREROUTING": ("prerouting", -100 + NFT_HOOK_OFFSET),
"POSTROUTING": ("postrouting", 100 + NFT_HOOK_OFFSET),
# "INPUT": ("input", 100 + NFT_HOOK_OFFSET),
# "OUTPUT": ("output", -100 + NFT_HOOK_OFFSET),
},
"filter": {
"PREROUTING": ("prerouting", 0 + NFT_HOOK_OFFSET),
"INPUT": ("input", 0 + NFT_HOOK_OFFSET),
"FORWARD": ("forward", 0 + NFT_HOOK_OFFSET),
"OUTPUT": ("output", 0 + NFT_HOOK_OFFSET),
},
}
def _icmp_types_fragments(protocol, type, code=None):
fragments = [{"match": {"left": {"payload": {"protocol": protocol, "field": "type"}},
"op": "==",
"right": type}}]
if code is not None:
fragments.append({"match": {"left": {"payload": {"protocol": protocol, "field": "code"}},
"op": "==",
"right": code}})
return fragments
# Most ICMP types are provided by nft, but for the codes we have to use numeric
# values.
#
ICMP_TYPES_FRAGMENTS = {
"ipv4": {
"communication-prohibited": _icmp_types_fragments("icmp", "destination-unreachable", 13),
"destination-unreachable": _icmp_types_fragments("icmp", "destination-unreachable"),
"echo-reply": _icmp_types_fragments("icmp", "echo-reply"),
"echo-request": _icmp_types_fragments("icmp", "echo-request"),
"fragmentation-needed": _icmp_types_fragments("icmp", "destination-unreachable", 4),
"host-precedence-violation": _icmp_types_fragments("icmp", "destination-unreachable", 14),
"host-prohibited": _icmp_types_fragments("icmp", "destination-unreachable", 10),
"host-redirect": _icmp_types_fragments("icmp", "redirect", 1),
"host-unknown": _icmp_types_fragments("icmp", "destination-unreachable", 7),
"host-unreachable": _icmp_types_fragments("icmp", "destination-unreachable", 1),
"ip-header-bad": _icmp_types_fragments("icmp", "parameter-problem", 1),
"network-prohibited": _icmp_types_fragments("icmp", "destination-unreachable", 8),
"network-redirect": _icmp_types_fragments("icmp", "redirect", 0),
"network-unknown": _icmp_types_fragments("icmp", "destination-unreachable", 6),
"network-unreachable": _icmp_types_fragments("icmp", "destination-unreachable", 0),
"parameter-problem": _icmp_types_fragments("icmp", "parameter-problem"),
"port-unreachable": _icmp_types_fragments("icmp", "destination-unreachable", 3),
"precedence-cutoff": _icmp_types_fragments("icmp", "destination-unreachable", 15),
"protocol-unreachable": _icmp_types_fragments("icmp", "destination-unreachable", 2),
"redirect": _icmp_types_fragments("icmp", "redirect"),
"required-option-missing": _icmp_types_fragments("icmp", "parameter-problem", 1),
"router-advertisement": _icmp_types_fragments("icmp", "router-advertisement"),
"router-solicitation": _icmp_types_fragments("icmp", "router-solicitation"),
"source-quench": _icmp_types_fragments("icmp", "source-quench"),
"source-route-failed": _icmp_types_fragments("icmp", "destination-unreachable", 5),
"time-exceeded": _icmp_types_fragments("icmp", "time-exceeded"),
"timestamp-reply": _icmp_types_fragments("icmp", "timestamp-reply"),
"timestamp-request": _icmp_types_fragments("icmp", "timestamp-request"),
"tos-host-redirect": _icmp_types_fragments("icmp", "redirect", 3),
"tos-host-unreachable": _icmp_types_fragments("icmp", "destination-unreachable", 12),
"tos-network-redirect": _icmp_types_fragments("icmp", "redirect", 2),
"tos-network-unreachable": _icmp_types_fragments("icmp", "destination-unreachable", 11),
"ttl-zero-during-reassembly": _icmp_types_fragments("icmp", "time-exceeded", 1),
"ttl-zero-during-transit": _icmp_types_fragments("icmp", "time-exceeded", 0),
},
"ipv6": {
"address-unreachable": _icmp_types_fragments("icmpv6", "destination-unreachable", 3),
"bad-header": _icmp_types_fragments("icmpv6", "parameter-problem", 0),
"beyond-scope": _icmp_types_fragments("icmpv6", "destination-unreachable", 2),
"communication-prohibited": _icmp_types_fragments("icmpv6", "destination-unreachable", 1),
"destination-unreachable": _icmp_types_fragments("icmpv6", "destination-unreachable"),
"echo-reply": _icmp_types_fragments("icmpv6", "echo-reply"),
"echo-request": _icmp_types_fragments("icmpv6", "echo-request"),
"failed-policy": _icmp_types_fragments("icmpv6", "destination-unreachable", 5),
"mld-listener-done": _icmp_types_fragments("icmpv6", "mld-listener-done"),
"mld-listener-query": _icmp_types_fragments("icmpv6", "mld-listener-query"),
"mld-listener-report": _icmp_types_fragments("icmpv6", "mld-listener-report"),
"mld2-listener-report": _icmp_types_fragments("icmpv6", "mld2-listener-report"),
"neighbour-advertisement": _icmp_types_fragments("icmpv6", "nd-neighbor-advert"),
"neighbour-solicitation": _icmp_types_fragments("icmpv6", "nd-neighbor-solicit"),
"no-route": _icmp_types_fragments("icmpv6", "destination-unreachable", 0),
"packet-too-big": _icmp_types_fragments("icmpv6", "packet-too-big"),
"parameter-problem": _icmp_types_fragments("icmpv6", "parameter-problem"),
"port-unreachable": _icmp_types_fragments("icmpv6", "destination-unreachable", 4),
"redirect": _icmp_types_fragments("icmpv6", "nd-redirect"),
"reject-route": _icmp_types_fragments("icmpv6", "destination-unreachable", 6),
"router-advertisement": _icmp_types_fragments("icmpv6", "nd-router-advert"),
"router-solicitation": _icmp_types_fragments("icmpv6", "nd-router-solicit"),
"time-exceeded": _icmp_types_fragments("icmpv6", "time-exceeded"),
"ttl-zero-during-reassembly": _icmp_types_fragments("icmpv6", "time-exceeded", 1),
"ttl-zero-during-transit": _icmp_types_fragments("icmpv6", "time-exceeded", 0),
"unknown-header-type": _icmp_types_fragments("icmpv6", "parameter-problem", 1),
"unknown-option": _icmp_types_fragments("icmpv6", "parameter-problem", 2),
}
}
class nftables(object):
name = "nftables"
policies_supported = True
def __init__(self, fw):
self._fw = fw
self.restore_command_exists = True
self.available_tables = []
self.rule_to_handle = {}
self.rule_ref_count = {}
self.rich_rule_priority_counts = {}
self.policy_priority_counts = {}
self.zone_source_index_cache = {}
self.created_tables = {"inet": [], "ip": [], "ip6": []}
self.nftables = Nftables()
self.nftables.set_echo_output(True)
self.nftables.set_handle_output(True)
def _run_replace_zone_source(self, rule, zone_source_index_cache):
for verb in ["add", "insert", "delete"]:
if verb in rule:
break
if "%%ZONE_SOURCE%%" in rule[verb]["rule"]:
zone_source = (rule[verb]["rule"]["%%ZONE_SOURCE%%"]["zone"],
rule[verb]["rule"]["%%ZONE_SOURCE%%"]["address"])
del rule[verb]["rule"]["%%ZONE_SOURCE%%"]
elif "%%ZONE_INTERFACE%%" in rule[verb]["rule"]:
zone_source = None
del rule[verb]["rule"]["%%ZONE_INTERFACE%%"]
else:
return
family = rule[verb]["rule"]["family"]
if zone_source and verb == "delete":
if family in zone_source_index_cache and \
zone_source in zone_source_index_cache[family]:
zone_source_index_cache[family].remove(zone_source)
elif verb != "delete":
if family not in zone_source_index_cache:
zone_source_index_cache[family] = []
if zone_source:
# order source based dispatch by zone name
if zone_source not in zone_source_index_cache[family]:
zone_source_index_cache[family].append(zone_source)
zone_source_index_cache[family].sort(key=lambda x: x[0])
index = zone_source_index_cache[family].index(zone_source)
else:
if self._fw._allow_zone_drifting:
index = 0
else:
index = len(zone_source_index_cache[family])
_verb_snippet = rule[verb]
del rule[verb]
if index == 0:
rule["insert"] = _verb_snippet
else:
index -= 1 # point to the rule before insertion point
rule["add"] = _verb_snippet
rule["add"]["rule"]["index"] = index
def reverse_rule(self, dict):
if "insert" in dict:
return {"delete": copy.deepcopy(dict["insert"])}
elif "add" in dict:
return {"delete": copy.deepcopy(dict["add"])}
else:
raise FirewallError(UNKNOWN_ERROR, "Failed to reverse rule")
def _set_rule_replace_priority(self, rule, priority_counts, token):
for verb in ["add", "insert", "delete"]:
if verb in rule:
break
if token in rule[verb]["rule"]:
priority = rule[verb]["rule"][token]
del rule[verb]["rule"][token]
if type(priority) != int:
raise FirewallError(INVALID_RULE, "priority must be followed by a number")
chain = (rule[verb]["rule"]["family"], rule[verb]["rule"]["chain"]) # family, chain
# Add the rule to the priority counts. We don't need to store the
# rule, just bump the ref count for the priority value.
if verb == "delete":
if chain not in priority_counts or \
priority not in priority_counts[chain] or \
priority_counts[chain][priority] <= 0:
raise FirewallError(UNKNOWN_ERROR, "nonexistent or underflow of priority count")
priority_counts[chain][priority] -= 1
else:
if chain not in priority_counts:
priority_counts[chain] = {}
if priority not in priority_counts[chain]:
priority_counts[chain][priority] = 0
# calculate index of new rule
index = 0
for p in sorted(priority_counts[chain].keys()):
if p == priority and verb == "insert":
break
index += priority_counts[chain][p]
if p == priority and verb == "add":
break
priority_counts[chain][priority] += 1
_verb_snippet = rule[verb]
del rule[verb]
if index == 0:
rule["insert"] = _verb_snippet
else:
index -= 1 # point to the rule before insertion point
rule["add"] = _verb_snippet
rule["add"]["rule"]["index"] = index
def _get_rule_key(self, rule):
for verb in ["add", "insert", "delete"]:
if verb in rule and "rule" in rule[verb]:
rule_key = copy.deepcopy(rule[verb]["rule"])
for non_key in ["index", "handle", "position"]:
if non_key in rule_key:
del rule_key[non_key]
# str(rule_key) is insufficient because dictionary order is
# not stable.. so abuse the JSON library
rule_key = json.dumps(rule_key, sort_keys=True)
return rule_key
# Not a rule (it's a table, chain, etc)
return None
def set_rules(self, rules, log_denied):
_valid_verbs = ["add", "insert", "delete", "flush", "replace"]
_valid_add_verbs = ["add", "insert", "replace"]
_deduplicated_rules = []
_executed_rules = []
rich_rule_priority_counts = copy.deepcopy(self.rich_rule_priority_counts)
policy_priority_counts = copy.deepcopy(self.policy_priority_counts)
zone_source_index_cache = copy.deepcopy(self.zone_source_index_cache)
rule_ref_count = self.rule_ref_count.copy()
for rule in rules:
if type(rule) != dict:
raise FirewallError(UNKNOWN_ERROR, "rule must be a dictionary, rule: %s" % (rule))
for verb in _valid_verbs:
if verb in rule:
break
if verb not in rule:
raise FirewallError(INVALID_RULE, "no valid verb found, rule: %s" % (rule))
rule_key = self._get_rule_key(rule)
# rule deduplication
if rule_key in rule_ref_count:
log.debug2("%s: prev rule ref cnt %d, %s", self.__class__,
rule_ref_count[rule_key], rule_key)
if verb != "delete":
rule_ref_count[rule_key] += 1
continue
elif rule_ref_count[rule_key] > 1:
rule_ref_count[rule_key] -= 1
continue
elif rule_ref_count[rule_key] == 1:
rule_ref_count[rule_key] -= 1
else:
raise FirewallError(UNKNOWN_ERROR, "rule ref count bug: rule_key '%s', cnt %d"
% (rule_key, rule_ref_count[rule_key]))
elif rule_key and verb != "delete":
rule_ref_count[rule_key] = 1
_deduplicated_rules.append(rule)
_rule = copy.deepcopy(rule)
if rule_key:
# filter empty rule expressions. Rich rules add quite a bit of
# them, but it makes the rest of the code simpler. libnftables
# does not tolerate them.
_rule[verb]["rule"]["expr"] = list(filter(None, _rule[verb]["rule"]["expr"]))
self._set_rule_replace_priority(_rule, rich_rule_priority_counts, "%%RICH_RULE_PRIORITY%%")
self._set_rule_replace_priority(_rule, policy_priority_counts, "%%POLICY_PRIORITY%%")
self._run_replace_zone_source(_rule, zone_source_index_cache)
# delete using rule handle
if verb == "delete":
_rule = {"delete": {"rule": {"family": _rule["delete"]["rule"]["family"],
"table": _rule["delete"]["rule"]["table"],
"chain": _rule["delete"]["rule"]["chain"],
"handle": self.rule_to_handle[rule_key]}}}
_executed_rules.append(_rule)
json_blob = {"nftables": [{"metainfo": {"json_schema_version": 1}}] + _executed_rules}
if log.getDebugLogLevel() >= 3:
# guarded with if statement because json.dumps() is expensive.
log.debug3("%s: calling python-nftables with JSON blob: %s", self.__class__,
json.dumps(json_blob))
rc, output, error = self.nftables.json_cmd(json_blob)
if rc != 0:
raise ValueError("'%s' failed: %s\nJSON blob:\n%s" % ("python-nftables", error, json.dumps(json_blob)))
self.rich_rule_priority_counts = rich_rule_priority_counts
self.policy_priority_counts = policy_priority_counts
self.zone_source_index_cache = zone_source_index_cache
self.rule_ref_count = rule_ref_count
index = 0
for rule in _deduplicated_rules:
index += 1 # +1 due to metainfo
rule_key = self._get_rule_key(rule)
if not rule_key:
continue
if "delete" in rule:
del self.rule_to_handle[rule_key]
del self.rule_ref_count[rule_key]
continue
for verb in _valid_add_verbs:
if verb in output["nftables"][index]:
break
if verb not in output["nftables"][index]:
continue
self.rule_to_handle[rule_key] = output["nftables"][index][verb]["rule"]["handle"]
def set_rule(self, rule, log_denied):
self.set_rules([rule], log_denied)
return ""
def get_available_tables(self, table=None):
# Tables always exist in nftables
return [table] if table else IPTABLES_TO_NFT_HOOK.keys()
def _build_delete_table_rules(self, table):
# To avoid nftables returning ENOENT we always add the table before
# deleting to guarantee it will exist.
#
# In the future, this add+delete should be replaced with "destroy", but
# that verb is too new to rely upon.
rules = []
for family in ["inet", "ip", "ip6"]:
rules.append({"add": {"table": {"family": family,
"name": table}}})
rules.append({"delete": {"table": {"family": family,
"name": table}}})
return rules
def build_flush_rules(self):
# Policy is stashed in a separate table that we're _not_ going to
# flush. As such, we retain the policy rule handles and ref counts.
saved_rule_to_handle = {}
saved_rule_ref_count = {}
for rule in self._build_set_policy_rules_ct_rules(True):
policy_key = self._get_rule_key(rule)
if policy_key in self.rule_to_handle:
saved_rule_to_handle[policy_key] = self.rule_to_handle[policy_key]
saved_rule_ref_count[policy_key] = self.rule_ref_count[policy_key]
self.rule_to_handle = saved_rule_to_handle
self.rule_ref_count = saved_rule_ref_count
self.rich_rule_priority_counts = {}
self.policy_priority_counts = {}
self.zone_source_index_cache = {}
for family in ["inet", "ip", "ip6"]:
if TABLE_NAME in self.created_tables[family]:
self.created_tables[family].remove(TABLE_NAME)
return self._build_delete_table_rules(TABLE_NAME)
def _build_set_policy_rules_ct_rules(self, enable):
add_del = { True: "add", False: "delete" }[enable]
rules = []
for hook in ["input", "forward", "output"]:
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME_POLICY,
"chain": "%s_%s" % ("filter", hook),
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["established", "related"]}}},
{"accept": None}]}}})
return rules
def build_set_policy_rules(self, policy):
# Policy is not exposed to the user. It's only to make sure we DROP
# packets while reloading and for panic mode. As such, using hooks with
# a higher priority than our base chains is sufficient.
rules = []
if policy == "PANIC":
rules.append({"add": {"table": {"family": "inet",
"name": TABLE_NAME_POLICY}}})
self.created_tables["inet"].append(TABLE_NAME_POLICY)
# Use "raw" priority for panic mode. This occurs before
# conntrack, mangle, nat, etc
for hook in ["prerouting", "output"]:
rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME_POLICY,
"name": "%s_%s" % ("raw", hook),
"type": "filter",
"hook": hook,
"prio": -300 + NFT_HOOK_OFFSET - 1,
"policy": "drop"}}})
if policy == "DROP":
rules.append({"add": {"table": {"family": "inet",
"name": TABLE_NAME_POLICY}}})
self.created_tables["inet"].append(TABLE_NAME_POLICY)
# To drop everything except existing connections we use
# "filter" because it occurs _after_ conntrack.
for hook in ["input", "forward", "output"]:
rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME_POLICY,
"name": "%s_%s" % ("filter", hook),
"type": "filter",
"hook": hook,
"prio": 0 + NFT_HOOK_OFFSET - 1,
"policy": "drop"}}})
rules += self._build_set_policy_rules_ct_rules(True)
elif policy == "ACCEPT":
for rule in self._build_set_policy_rules_ct_rules(False):
policy_key = self._get_rule_key(rule)
if policy_key in self.rule_to_handle:
rules.append(rule)
rules += self._build_delete_table_rules(TABLE_NAME_POLICY)
if TABLE_NAME_POLICY in self.created_tables["inet"]:
self.created_tables["inet"].remove(TABLE_NAME_POLICY)
else:
FirewallError(UNKNOWN_ERROR, "not implemented")
return rules
def supported_icmp_types(self, ipv=None):
# nftables supports any icmp_type via arbitrary type/code matching.
# We just need a translation for it in ICMP_TYPES_FRAGMENTS.
supported = set()
for _ipv in [ipv] if ipv else ICMP_TYPES_FRAGMENTS.keys():
supported.update(ICMP_TYPES_FRAGMENTS[_ipv].keys())
return list(supported)
def build_default_tables(self):
default_tables = []
for family in ["inet", "ip", "ip6"]:
default_tables.append({"add": {"table": {"family": family,
"name": TABLE_NAME}}})
self.created_tables[family].append(TABLE_NAME)
return default_tables
def build_default_rules(self, log_denied="off"):
default_rules = []
for chain in IPTABLES_TO_NFT_HOOK["mangle"].keys():
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "mangle_%s" % chain,
"type": "filter",
"hook": "%s" % IPTABLES_TO_NFT_HOOK["mangle"][chain][0],
"prio": IPTABLES_TO_NFT_HOOK["mangle"][chain][1]}}})
for dispatch_suffix in ["POLICIES_pre", "ZONES_SOURCE", "ZONES", "POLICIES_post"] if self._fw._allow_zone_drifting else ["POLICIES_pre", "ZONES", "POLICIES_post"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "mangle_%s_%s" % (chain, dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "mangle_%s" % chain,
"expr": [{"jump": {"target": "mangle_%s_%s" % (chain, dispatch_suffix)}}]}}})
for family in ["ip", "ip6"]:
for chain in IPTABLES_TO_NFT_HOOK["nat"].keys():
default_rules.append({"add": {"chain": {"family": family,
"table": TABLE_NAME,
"name": "nat_%s" % chain,
"type": "nat",
"hook": "%s" % IPTABLES_TO_NFT_HOOK["nat"][chain][0],
"prio": IPTABLES_TO_NFT_HOOK["nat"][chain][1]}}})
for dispatch_suffix in ["POLICIES_pre", "ZONES_SOURCE", "ZONES", "POLICIES_post"] if self._fw._allow_zone_drifting else ["POLICIES_pre", "ZONES", "POLICIES_post"]:
default_rules.append({"add": {"chain": {"family": family,
"table": TABLE_NAME,
"name": "nat_%s_%s" % (chain, dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": family,
"table": TABLE_NAME,
"chain": "nat_%s" % chain,
"expr": [{"jump": {"target": "nat_%s_%s" % (chain, dispatch_suffix)}}]}}})
for chain in IPTABLES_TO_NFT_HOOK["filter"].keys():
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s" % chain,
"type": "filter",
"hook": "%s" % IPTABLES_TO_NFT_HOOK["filter"][chain][0],
"prio": IPTABLES_TO_NFT_HOOK["filter"][chain][1]}}})
# filter, INPUT
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["established", "related"]}}},
{"accept": None}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"match": {"left": {"ct": {"key": "status"}},
"op": "in",
"right": "dnat"}},
{"accept": None}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"match": {"left": {"meta": {"key": "iifname"}},
"op": "==",
"right": "lo"}},
{"accept": None}]}}})
for dispatch_suffix in ["POLICIES_pre", "ZONES_SOURCE", "ZONES", "POLICIES_post"] if self._fw._allow_zone_drifting else ["POLICIES_pre", "ZONES", "POLICIES_post"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s_%s" % ("INPUT", dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"jump": {"target": "filter_%s_%s" % ("INPUT", dispatch_suffix)}}]}}})
if log_denied != "off":
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["invalid"]}}},
self._pkttype_match_fragment(log_denied),
{"log": {"prefix": "STATE_INVALID_DROP: "}}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["invalid"]}}},
{"drop": None}]}}})
if log_denied != "off":
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [self._pkttype_match_fragment(log_denied),
{"log": {"prefix": "FINAL_REJECT: "}}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "INPUT",
"expr": [{"reject": {"type": "icmpx", "expr": "admin-prohibited"}}]}}})
# filter, FORWARD
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["established", "related"]}}},
{"accept": None}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"match": {"left": {"ct": {"key": "status"}},
"op": "in",
"right": "dnat"}},
{"accept": None}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"match": {"left": {"meta": {"key": "iifname"}},
"op": "==",
"right": "lo"}},
{"accept": None}]}}})
for dispatch_suffix in ["POLICIES_pre"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s_%s" % ("FORWARD", dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"jump": {"target": "filter_%s_%s" % ("FORWARD", dispatch_suffix)}}]}}})
for direction in ["IN", "OUT"]:
for dispatch_suffix in ["ZONES_SOURCE", "ZONES"] if self._fw._allow_zone_drifting else ["ZONES"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s_%s_%s" % ("FORWARD", direction, dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"jump": {"target": "filter_%s_%s_%s" % ("FORWARD", direction, dispatch_suffix)}}]}}})
for dispatch_suffix in ["POLICIES_post"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s_%s" % ("FORWARD", dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"jump": {"target": "filter_%s_%s" % ("FORWARD", dispatch_suffix)}}]}}})
if log_denied != "off":
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["invalid"]}}},
self._pkttype_match_fragment(log_denied),
{"log": {"prefix": "STATE_INVALID_DROP: "}}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["invalid"]}}},
{"drop": None}]}}})
if log_denied != "off":
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [self._pkttype_match_fragment(log_denied),
{"log": {"prefix": "FINAL_REJECT: "}}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "FORWARD",
"expr": [{"reject": {"type": "icmpx", "expr": "admin-prohibited"}}]}}})
# filter, OUTPUT
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "OUTPUT",
"expr": [{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["established", "related"]}}},
{"accept": None}]}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_OUTPUT",
"expr": [{"match": {"left": {"meta": {"key": "oifname"}},
"op": "==",
"right": "lo"}},
{"accept": None}]}}})
for dispatch_suffix in ["POLICIES_pre"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s_%s" % ("OUTPUT", dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "OUTPUT",
"expr": [{"jump": {"target": "filter_%s_%s" % ("OUTPUT", dispatch_suffix)}}]}}})
for dispatch_suffix in ["POLICIES_post"]:
default_rules.append({"add": {"chain": {"family": "inet",
"table": TABLE_NAME,
"name": "filter_%s_%s" % ("OUTPUT", dispatch_suffix)}}})
default_rules.append({"add": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s" % "OUTPUT",
"expr": [{"jump": {"target": "filter_%s_%s" % ("OUTPUT", dispatch_suffix)}}]}}})
return default_rules
def get_zone_table_chains(self, table):
if table == "filter":
return ["INPUT", "FORWARD_IN", "FORWARD_OUT"]
if table == "mangle":
return ["PREROUTING"]
if table == "nat":
return ["PREROUTING", "POSTROUTING"]
return []
def build_policy_ingress_egress_rules(self, enable, policy, table, chain,
ingress_interfaces, egress_interfaces,
ingress_sources, egress_sources,
family="inet"):
# nat tables need to use ip/ip6 family
if table == "nat" and family == "inet":
rules = []
rules.extend(self.build_policy_ingress_egress_rules(enable, policy, table, chain,
ingress_interfaces, egress_interfaces,
ingress_sources, egress_sources,
family="ip"))
rules.extend(self.build_policy_ingress_egress_rules(enable, policy, table, chain,
ingress_interfaces, egress_interfaces,
ingress_sources, egress_sources,
family="ip6"))
return rules
p_obj = self._fw.policy.get_policy(policy)
chain_suffix = "pre" if p_obj.priority < 0 else "post"
isSNAT = True if (table == "nat" and chain == "POSTROUTING") else False
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX, isSNAT)
ingress_fragments = []
egress_fragments = []
if ingress_interfaces:
ingress_fragments.append({"match": {"left": {"meta": {"key": "iifname"}},
"op": "==",
"right": {"set": list(ingress_interfaces)}}})
if egress_interfaces:
egress_fragments.append({"match": {"left": {"meta": {"key": "oifname"}},
"op": "==",
"right": {"set": list(egress_interfaces)}}})
ipv_to_family = {"ipv4": "ip", "ipv6": "ip6"}
if ingress_sources:
for src in ingress_sources:
# skip if this source doesn't apply to the current family.
if table == "nat":
ipv = self._fw.zone.check_source(src)
if ipv in ipv_to_family and family != ipv_to_family[ipv]:
continue
ingress_fragments.append(self._rule_addr_fragment("saddr", src))
if egress_sources:
for dst in egress_sources:
# skip if this source doesn't apply to the current family.
if table == "nat":
ipv = self._fw.zone.check_source(dst)
if ipv in ipv_to_family and family != ipv_to_family[ipv]:
continue
egress_fragments.append(self._rule_addr_fragment("daddr", dst))
def _generate_policy_dispatch_rule(ingress_fragment, egress_fragment):
expr_fragments = []
if ingress_fragment:
expr_fragments.append(ingress_fragment)
if egress_fragment:
expr_fragments.append(egress_fragment)
expr_fragments.append({"jump": {"target": "%s_%s" % (table, _policy)}})
rule = {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s_POLICIES_%s" % (table, chain, chain_suffix),
"expr": expr_fragments}
rule.update(self._policy_priority_fragment(p_obj))
if enable:
return {"add": {"rule": rule}}
else:
return {"delete": {"rule": rule}}
rules = []
if ingress_fragments: # zone --> [zone, ANY, HOST]
for ingress_fragment in ingress_fragments:
if egress_fragments:
# zone --> zone
for egress_fragment in egress_fragments:
rules.append(_generate_policy_dispatch_rule(ingress_fragment, egress_fragment))
elif table =="nat" and egress_sources:
# if the egress source is not for the current family (there
# are no egress fragments), then avoid creating an invalid
# catch all rule.
pass
else:
# zone --> [ANY, HOST]
rules.append(_generate_policy_dispatch_rule(ingress_fragment, None))
elif table =="nat" and ingress_sources:
# if the ingress source is not for the current family (there are no
# ingress fragments), then avoid creating an invalid catch all
# rule.
pass
else: # [ANY, HOST] --> [zone, ANY, HOST]
if egress_fragments:
# [ANY, HOST] --> zone
for egress_fragment in egress_fragments:
rules.append(_generate_policy_dispatch_rule(None, egress_fragment))
elif table =="nat" and egress_sources:
# if the egress source is not for the current family (there are
# no egress fragments), then avoid creating an invalid catch
# all rule.
pass
else:
# [ANY, HOST] --> [ANY, HOST]
rules.append(_generate_policy_dispatch_rule(None, None))
return rules
def build_zone_source_interface_rules(self, enable, zone, policy, interface,
table, chain, append=False,
family="inet"):
# nat tables needs to use ip/ip6 family
if table == "nat" and family == "inet":
rules = []
rules.extend(self.build_zone_source_interface_rules(enable, zone, policy,
interface, table, chain, append, "ip"))
rules.extend(self.build_zone_source_interface_rules(enable, zone, policy,
interface, table, chain, append, "ip6"))
return rules
isSNAT = True if (table == "nat" and chain == "POSTROUTING") else False
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX, isSNAT=isSNAT)
opt = {
"PREROUTING": "iifname",
"POSTROUTING": "oifname",
"INPUT": "iifname",
"FORWARD_IN": "iifname",
"FORWARD_OUT": "oifname",
"OUTPUT": "oifname",
}[chain]
if interface[len(interface)-1] == "+":
interface = interface[:len(interface)-1] + "*"
action = "goto"
if interface == "*":
expr_fragments = [{action: {"target": "%s_%s" % (table, _policy)}}]
else:
expr_fragments = [{"match": {"left": {"meta": {"key": opt}},
"op": "==",
"right": interface}},
{action: {"target": "%s_%s" % (table, _policy)}}]
if enable and not append:
verb = "insert"
rule = {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s_ZONES" % (table, chain),
"expr": expr_fragments}
rule.update(self._zone_interface_fragment())
elif enable:
verb = "add"
rule = {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s_ZONES" % (table, chain),
"expr": expr_fragments}
else:
verb = "delete"
rule = {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s_ZONES" % (table, chain),
"expr": expr_fragments}
if not append:
rule.update(self._zone_interface_fragment())
return [{verb: {"rule": rule}}]
def build_zone_source_address_rules(self, enable, zone, policy,
address, table, chain, family="inet"):
# nat tables needs to use ip/ip6 family
if table == "nat" and family == "inet":
rules = []
if address.startswith("ipset:"):
ipset_family = self._set_get_family(address[len("ipset:"):])
else:
ipset_family = None
if check_address("ipv4", address) or check_mac(address) or ipset_family == "ip":
rules.extend(self.build_zone_source_address_rules(enable, zone, policy,
address, table, chain, "ip"))
if check_address("ipv6", address) or check_mac(address) or ipset_family == "ip6":
rules.extend(self.build_zone_source_address_rules(enable, zone, policy,
address, table, chain, "ip6"))
return rules
isSNAT = True if (table == "nat" and chain == "POSTROUTING") else False
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX, isSNAT=isSNAT)
add_del = { True: "insert", False: "delete" }[enable]
opt = {
"PREROUTING": "saddr",
"POSTROUTING": "daddr",
"INPUT": "saddr",
"FORWARD_IN": "saddr",
"FORWARD_OUT": "daddr",
"OUTPUT": "daddr",
}[chain]
if self._fw._allow_zone_drifting:
zone_dispatch_chain = "%s_%s_ZONES_SOURCE" % (table, chain)
else:
zone_dispatch_chain = "%s_%s_ZONES" % (table, chain)
action = "goto"
rule = {"family": family,
"table": TABLE_NAME,
"chain": zone_dispatch_chain,
"expr": [self._rule_addr_fragment(opt, address),
{action: {"target": "%s_%s" % (table, _policy)}}]}
rule.update(self._zone_source_fragment(zone, address))
return [{add_del: {"rule": rule}}]
def build_policy_chain_rules(self, enable, policy, table, chain, family="inet"):
# nat tables needs to use ip/ip6 family
if table == "nat" and family == "inet":
rules = []
rules.extend(self.build_policy_chain_rules(enable, policy, table, chain, "ip"))
rules.extend(self.build_policy_chain_rules(enable, policy, table, chain, "ip6"))
return rules
add_del = { True: "add", False: "delete" }[enable]
isSNAT = True if (table == "nat" and chain == "POSTROUTING") else False
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX, isSNAT=isSNAT)
rules = []
rules.append({add_del: {"chain": {"family": family,
"table": TABLE_NAME,
"name": "%s_%s" % (table, _policy)}}})
for chain_suffix in ["pre", "log", "deny", "allow", "post"]:
rules.append({add_del: {"chain": {"family": family,
"table": TABLE_NAME,
"name": "%s_%s_%s" % (table, _policy, chain_suffix)}}})
for chain_suffix in ["pre", "log", "deny", "allow", "post"]:
rules.append({add_del: {"rule": {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s" % (table, _policy),
"expr": [{"jump": {"target": "%s_%s_%s" % (table, _policy, chain_suffix)}}]}}})
target = self._fw.policy._policies[policy].target
if self._fw.get_log_denied() != "off":
if table == "filter":
if target in ["REJECT", "%%REJECT%%", "DROP"]:
log_suffix = target
if target == "%%REJECT%%":
log_suffix = "REJECT"
rules.append({add_del: {"rule": {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s" % (table, _policy),
"expr": [self._pkttype_match_fragment(self._fw.get_log_denied()),
{"log": {"prefix": "\"filter_%s_%s: \"" % (_policy, log_suffix)}}]}}})
if table == "filter" and \
target in ["ACCEPT", "REJECT", "%%REJECT%%", "DROP"]:
if target in ["%%REJECT%%", "REJECT"]:
target_fragment = self._reject_fragment()
else:
target_fragment = {target.lower(): None}
rules.append({add_del: {"rule": {"family": family,
"table": TABLE_NAME,
"chain": "%s_%s" % (table, _policy),
"expr": [target_fragment]}}})
if not enable:
rules.reverse()
return rules
def _pkttype_match_fragment(self, pkttype):
if pkttype == "all":
return {}
elif pkttype in ["unicast", "broadcast", "multicast"]:
return {"match": {"left": {"meta": {"key": "pkttype"}},
"op": "==",
"right": pkttype}}
raise FirewallError(INVALID_RULE, "Invalid pkttype \"%s\"", pkttype)
def _reject_types_fragment(self, reject_type):
frags = {
# REJECT_TYPES : <nft reject rule fragment>
"icmp-host-prohibited" : {"reject": {"type": "icmp", "expr": "host-prohibited"}},
"host-prohib" : {"reject": {"type": "icmp", "expr": "host-prohibited"}},
"icmp-net-prohibited" : {"reject": {"type": "icmp", "expr": "net-prohibited"}},
"net-prohib" : {"reject": {"type": "icmp", "expr": "net-prohibited"}},
"icmp-admin-prohibited" : {"reject": {"type": "icmp", "expr": "admin-prohibited"}},
"admin-prohib" : {"reject": {"type": "icmp", "expr": "admin-prohibited"}},
"icmp6-adm-prohibited" : {"reject": {"type": "icmpv6", "expr": "admin-prohibited"}},
"adm-prohibited" : {"reject": {"type": "icmpv6", "expr": "admin-prohibited"}},
"icmp-net-unreachable" : {"reject": {"type": "icmp", "expr": "net-unreachable"}},
"net-unreach" : {"reject": {"type": "icmp", "expr": "net-unreachable"}},
"icmp-host-unreachable" : {"reject": {"type": "icmp", "expr": "host-unreachable"}},
"host-unreach" : {"reject": {"type": "icmp", "expr": "host-unreachable"}},
"icmp-port-unreachable" : {"reject": {"type": "icmp", "expr": "port-unreachable"}},
"icmp6-port-unreachable" : {"reject": {"type": "icmpv6", "expr": "port-unreachable"}},
"port-unreach" : {"reject": {"type": "icmpx", "expr": "port-unreachable"}},
"icmp-proto-unreachable" : {"reject": {"type": "icmp", "expr": "prot-unreachable"}},
"proto-unreach" : {"reject": {"type": "icmp", "expr": "prot-unreachable"}},
"icmp6-addr-unreachable" : {"reject": {"type": "icmpv6", "expr": "addr-unreachable"}},
"addr-unreach" : {"reject": {"type": "icmpv6", "expr": "addr-unreachable"}},
"icmp6-no-route" : {"reject": {"type": "icmpv6", "expr": "no-route"}},
"no-route" : {"reject": {"type": "icmpv6", "expr": "no-route"}},
"tcp-reset" : {"reject": {"type": "tcp reset"}},
"tcp-rst" : {"reject": {"type": "tcp reset"}},
}
return frags[reject_type]
def _reject_fragment(self):
return {"reject": {"type": "icmpx",
"expr": "admin-prohibited"}}
def _icmp_match_fragment(self):
return {"match": {"left": {"meta": {"key": "l4proto"}},
"op": "==",
"right": {"set": ["icmp", "icmpv6"]}}}
def _rich_rule_limit_fragment(self, limit):
if not limit:
return {}
rich_to_nft = {
"s" : "second",
"m" : "minute",
"h" : "hour",
"d" : "day",
}
rate, duration = limit.value_parse()
d = {
"rate": rate,
"per": rich_to_nft[duration],
}
burst = limit.burst_parse()
if burst is not None:
d["burst"] = burst
return {"limit": d}
def _rich_rule_chain_suffix(self, rich_rule):
if type(rich_rule.element) in [Rich_Masquerade, Rich_ForwardPort, Rich_IcmpBlock]:
# These are special and don't have an explicit action
pass
elif rich_rule.action:
if type(rich_rule.action) not in [Rich_Accept, Rich_Reject, Rich_Drop, Rich_Mark]:
raise FirewallError(INVALID_RULE, "Unknown action %s" % type(rich_rule.action))
else:
raise FirewallError(INVALID_RULE, "No rule action specified.")
if rich_rule.priority == 0:
if type(rich_rule.element) in [Rich_Masquerade, Rich_ForwardPort] or \
type(rich_rule.action) in [Rich_Accept, Rich_Mark]:
return "allow"
elif type(rich_rule.element) in [Rich_IcmpBlock] or \
type(rich_rule.action) in [Rich_Reject, Rich_Drop]:
return "deny"
elif rich_rule.priority < 0:
return "pre"
else:
return "post"
def _rich_rule_chain_suffix_from_log(self, rich_rule):
if not rich_rule.log and not rich_rule.audit:
raise FirewallError(INVALID_RULE, "Not log or audit")
if rich_rule.priority == 0:
return "log"
elif rich_rule.priority < 0:
return "pre"
else:
return "post"
def _zone_interface_fragment(self):
return {"%%ZONE_INTERFACE%%": None}
def _zone_source_fragment(self, zone, address):
if check_single_address("ipv6", address):
address = normalizeIP6(address)
elif check_address("ipv6", address):
addr_split = address.split("/")
address = normalizeIP6(addr_split[0]) + "/" + addr_split[1]
return {"%%ZONE_SOURCE%%": {"zone": zone, "address": address}}
def _policy_priority_fragment(self, policy):
return {"%%POLICY_PRIORITY%%": policy.priority}
def _rich_rule_priority_fragment(self, rich_rule):
if not rich_rule or rich_rule.priority == 0:
return {}
return {"%%RICH_RULE_PRIORITY%%": rich_rule.priority}
def _rich_rule_log(self, policy, rich_rule, enable, table, expr_fragments):
if not rich_rule.log:
return {}
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
chain_suffix = self._rich_rule_chain_suffix_from_log(rich_rule)
log_options = {}
if rich_rule.log.prefix:
log_options["prefix"] = "%s" % rich_rule.log.prefix
if rich_rule.log.level:
level = "warn" if "warning" == rich_rule.log.level else rich_rule.log.level
log_options["level"] = "%s" % level
rule = {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s_%s" % (table, _policy, chain_suffix),
"expr": expr_fragments +
[self._rich_rule_limit_fragment(rich_rule.log.limit),
{"log": log_options}]}
rule.update(self._rich_rule_priority_fragment(rich_rule))
return {add_del: {"rule": rule}}
def _rich_rule_audit(self, policy, rich_rule, enable, table, expr_fragments):
if not rich_rule.audit:
return {}
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
chain_suffix = self._rich_rule_chain_suffix_from_log(rich_rule)
rule = {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s_%s" % (table, _policy, chain_suffix),
"expr": expr_fragments +
[self._rich_rule_limit_fragment(rich_rule.audit.limit),
{"log": {"level": "audit"}}]}
rule.update(self._rich_rule_priority_fragment(rich_rule))
return {add_del: {"rule": rule}}
def _rich_rule_action(self, policy, rich_rule, enable, table, expr_fragments):
if not rich_rule.action:
return {}
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
chain_suffix = self._rich_rule_chain_suffix(rich_rule)
chain = "%s_%s_%s" % (table, _policy, chain_suffix)
if type(rich_rule.action) == Rich_Accept:
rule_action = {"accept": None}
elif type(rich_rule.action) == Rich_Reject:
if rich_rule.action.type:
rule_action = self._reject_types_fragment(rich_rule.action.type)
else:
rule_action = {"reject": None}
elif type(rich_rule.action) == Rich_Drop:
rule_action = {"drop": None}
elif type(rich_rule.action) == Rich_Mark:
table = "mangle"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
chain = "%s_%s_%s" % (table, _policy, chain_suffix)
value = rich_rule.action.set.split("/")
if len(value) > 1:
rule_action = {"mangle": {"key": {"meta": {"key": "mark"}},
"value": {"^": [{"&": [{"meta": {"key": "mark"}}, value[1]]}, value[0]]}}}
else:
rule_action = {"mangle": {"key": {"meta": {"key": "mark"}},
"value": value[0]}}
else:
raise FirewallError(INVALID_RULE,
"Unknown action %s" % type(rich_rule.action))
rule = {"family": "inet",
"table": TABLE_NAME,
"chain": chain,
"expr": expr_fragments +
[self._rich_rule_limit_fragment(rich_rule.action.limit), rule_action]}
rule.update(self._rich_rule_priority_fragment(rich_rule))
return {add_del: {"rule": rule}}
def _rule_addr_fragment(self, addr_field, address, invert=False):
if address.startswith("ipset:"):
return self._set_match_fragment(address[len("ipset:"):], True if "daddr" == addr_field else False, invert)
else:
if check_mac(address):
family = "ether"
elif check_single_address("ipv4", address):
family = "ip"
elif check_address("ipv4", address):
family = "ip"
normalized_address = ipaddress.IPv4Network(address, strict=False)
address = {"prefix": {"addr": normalized_address.network_address.compressed, "len": normalized_address.prefixlen}}
elif check_single_address("ipv6", address):
family = "ip6"
address = normalizeIP6(address)
else:
family = "ip6"
addr_len = address.split("/")
address = {"prefix": {"addr": normalizeIP6(addr_len[0]), "len": int(addr_len[1])}}
return {"match": {"left": {"payload": {"protocol": family,
"field": addr_field}},
"op": "!=" if invert else "==",
"right": address}}
def _rich_rule_family_fragment(self, rich_family):
if not rich_family:
return {}
if rich_family not in ["ipv4", "ipv6"]:
raise FirewallError(INVALID_RULE,
"Invalid family" % rich_family)
return {"match": {"left": {"meta": {"key": "nfproto"}},
"op": "==",
"right": rich_family}}
def _rich_rule_destination_fragment(self, rich_dest):
if not rich_dest:
return {}
if rich_dest.addr:
address = rich_dest.addr
elif rich_dest.ipset:
address = "ipset:" + rich_dest.ipset
return self._rule_addr_fragment("daddr", address, invert=rich_dest.invert)
def _rich_rule_source_fragment(self, rich_source):
if not rich_source:
return {}
if rich_source.addr:
address = rich_source.addr
elif hasattr(rich_source, "mac") and rich_source.mac:
address = rich_source.mac
elif hasattr(rich_source, "ipset") and rich_source.ipset:
address = "ipset:" + rich_source.ipset
return self._rule_addr_fragment("saddr", address, invert=rich_source.invert)
def _port_fragment(self, port):
range = getPortRange(port)
if isinstance(range, int) and range < 0:
raise FirewallError(INVALID_PORT)
elif len(range) == 1:
return range[0]
else:
return {"range": [range[0], range[1]]}
def build_policy_ports_rules(self, enable, policy, proto, port, destination=None, rich_rule=None):
add_del = { True: "add", False: "delete" }[enable]
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_family_fragment(rich_rule.family))
if destination:
expr_fragments.append(self._rule_addr_fragment("daddr", destination))
if rich_rule:
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
expr_fragments.append({"match": {"left": {"payload": {"protocol": proto,
"field": "dport"}},
"op": "==",
"right": self._port_fragment(port)}})
if not rich_rule or type(rich_rule.action) != Rich_Mark:
expr_fragments.append({"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["new", "untracked"]}}})
rules = []
if rich_rule:
rules.append(self._rich_rule_log(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_audit(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_action(policy, rich_rule, enable, table, expr_fragments))
else:
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s_allow" % (table, _policy),
"expr": expr_fragments + [{"accept": None}]}}})
return rules
def build_policy_protocol_rules(self, enable, policy, protocol, destination=None, rich_rule=None):
add_del = { True: "add", False: "delete" }[enable]
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_family_fragment(rich_rule.family))
if destination:
expr_fragments.append(self._rule_addr_fragment("daddr", destination))
if rich_rule:
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
expr_fragments.append({"match": {"left": {"meta": {"key": "l4proto"}},
"op": "==",
"right": protocol}})
if not rich_rule or type(rich_rule.action) != Rich_Mark:
expr_fragments.append({"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["new", "untracked"]}}})
rules = []
if rich_rule:
rules.append(self._rich_rule_log(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_audit(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_action(policy, rich_rule, enable, table, expr_fragments))
else:
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s_allow" % (table, _policy),
"expr": expr_fragments + [{"accept": None}]}}})
return rules
def build_policy_source_ports_rules(self, enable, policy, proto, port,
destination=None, rich_rule=None):
add_del = { True: "add", False: "delete" }[enable]
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_family_fragment(rich_rule.family))
if destination:
expr_fragments.append(self._rule_addr_fragment("daddr", destination))
if rich_rule:
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
expr_fragments.append({"match": {"left": {"payload": {"protocol": proto,
"field": "sport"}},
"op": "==",
"right": self._port_fragment(port)}})
if not rich_rule or type(rich_rule.action) != Rich_Mark:
expr_fragments.append({"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["new", "untracked"]}}})
rules = []
if rich_rule:
rules.append(self._rich_rule_log(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_audit(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_action(policy, rich_rule, enable, table, expr_fragments))
else:
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s_allow" % (table, _policy),
"expr": expr_fragments + [{"accept": None}]}}})
return rules
def build_policy_helper_ports_rules(self, enable, policy, proto, port,
destination, helper_name, module_short_name):
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
rules = []
if enable:
rules.append({"add": {"ct helper": {"family": "inet",
"table": TABLE_NAME,
"name": "helper-%s-%s" % (helper_name, proto),
"type": module_short_name,
"protocol": proto}}})
expr_fragments = []
if destination:
expr_fragments.append(self._rule_addr_fragment("daddr", destination))
expr_fragments.append({"match": {"left": {"payload": {"protocol": proto,
"field": "dport"}},
"op": "==",
"right": self._port_fragment(port)}})
expr_fragments.append({"ct helper": "helper-%s-%s" % (helper_name, proto)})
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s_allow" % (_policy),
"expr": expr_fragments}}})
return rules
def build_zone_forward_rules(self, enable, zone, policy, table, interface=None, source=None):
add_del = { True: "add", False: "delete" }[enable]
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
rules = []
if interface:
if interface[len(interface)-1] == "+":
interface = interface[:len(interface)-1] + "*"
expr = [{"match": {"left": {"meta": {"key": "oifname"}},
"op": "==",
"right": interface}},
{"accept": None}]
else: # source
expr = [self._rule_addr_fragment("daddr", source), {"accept": None}]
rule = {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s_allow" % (_policy),
"expr": expr}
rules.append({add_del: {"rule": rule}})
return rules
def _build_policy_masquerade_nat_rules(self, enable, policy, family, rich_rule=None):
table = "nat"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX, isSNAT=True)
add_del = { True: "add", False: "delete" }[enable]
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
chain_suffix = self._rich_rule_chain_suffix(rich_rule)
else:
chain_suffix = "allow"
rule = {"family": family,
"table": TABLE_NAME,
"chain": "nat_%s_%s" % (_policy, chain_suffix),
"expr": expr_fragments +
[{"match": {"left": {"meta": {"key": "oifname"}},
"op": "!=",
"right": "lo"}},
{"masquerade": None}]}
rule.update(self._rich_rule_priority_fragment(rich_rule))
return [{add_del: {"rule": rule}}]
def build_policy_masquerade_rules(self, enable, policy, rich_rule=None):
# nat tables needs to use ip/ip6 family
rules = []
if rich_rule and (rich_rule.family and rich_rule.family == "ipv6"
or rich_rule.source and check_address("ipv6", rich_rule.source.addr)):
rules.extend(self._build_policy_masquerade_nat_rules(enable, policy, "ip6", rich_rule))
elif rich_rule and (rich_rule.family and rich_rule.family == "ipv4"
or rich_rule.source and check_address("ipv4", rich_rule.source.addr)):
rules.extend(self._build_policy_masquerade_nat_rules(enable, policy, "ip", rich_rule))
else:
rules.extend(self._build_policy_masquerade_nat_rules(enable, policy, "ip", rich_rule))
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
chain_suffix = self._rich_rule_chain_suffix(rich_rule)
else:
chain_suffix = "allow"
rule = {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_%s_%s" % (_policy, chain_suffix),
"expr": expr_fragments +
[{"match": {"left": {"ct": {"key": "state"}},
"op": "in",
"right": {"set": ["new", "untracked"]}}},
{"accept": None}]}
rule.update(self._rich_rule_priority_fragment(rich_rule))
rules.append({add_del: {"rule": rule}})
return rules
def _build_policy_forward_port_nat_rules(self, enable, policy, port, protocol,
toaddr, toport, family,
rich_rule=None):
table = "nat"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
chain_suffix = self._rich_rule_chain_suffix(rich_rule)
else:
chain_suffix = "allow"
expr_fragments.append({"match": {"left": {"payload": {"protocol": protocol,
"field": "dport"}},
"op": "==",
"right": self._port_fragment(port)}})
if toaddr:
if check_single_address("ipv6", toaddr):
toaddr = normalizeIP6(toaddr)
if toport and toport != "":
expr_fragments.append({"dnat": {"addr": toaddr, "port": self._port_fragment(toport)}})
else:
expr_fragments.append({"dnat": {"addr": toaddr}})
else:
expr_fragments.append({"redirect": {"port": self._port_fragment(toport)}})
rule = {"family": family,
"table": TABLE_NAME,
"chain": "nat_%s_%s" % (_policy, chain_suffix),
"expr": expr_fragments}
rule.update(self._rich_rule_priority_fragment(rich_rule))
return [{add_del: {"rule": rule}}]
def build_policy_forward_port_rules(self, enable, policy, port,
protocol, toport, toaddr, rich_rule=None):
rules = []
if rich_rule and (rich_rule.family and rich_rule.family == "ipv6"
or toaddr and check_single_address("ipv6", toaddr)):
rules.extend(self._build_policy_forward_port_nat_rules(enable, policy,
port, protocol, toaddr, toport, "ip6", rich_rule))
elif rich_rule and (rich_rule.family and rich_rule.family == "ipv4"
or toaddr and check_single_address("ipv4", toaddr)):
rules.extend(self._build_policy_forward_port_nat_rules(enable, policy,
port, protocol, toaddr, toport, "ip", rich_rule))
else:
if toaddr and check_single_address("ipv6", toaddr):
rules.extend(self._build_policy_forward_port_nat_rules(enable, policy,
port, protocol, toaddr, toport, "ip6", rich_rule))
else:
rules.extend(self._build_policy_forward_port_nat_rules(enable, policy,
port, protocol, toaddr, toport, "ip", rich_rule))
return rules
def _icmp_types_to_nft_fragments(self, ipv, icmp_type):
if icmp_type in ICMP_TYPES_FRAGMENTS[ipv]:
return ICMP_TYPES_FRAGMENTS[ipv][icmp_type]
else:
raise FirewallError(INVALID_ICMPTYPE,
"ICMP type '%s' not supported by %s for %s" % (icmp_type, self.name, ipv))
def build_policy_icmp_block_rules(self, enable, policy, ict, rich_rule=None):
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
add_del = { True: "add", False: "delete" }[enable]
if rich_rule and rich_rule.ipvs:
ipvs = rich_rule.ipvs
elif ict.destination:
ipvs = []
if "ipv4" in ict.destination:
ipvs.append("ipv4")
if "ipv6" in ict.destination:
ipvs.append("ipv6")
else:
ipvs = ["ipv4", "ipv6"]
rules = []
for ipv in ipvs:
if self._fw.policy.query_icmp_block_inversion(policy):
final_chain = "%s_%s_allow" % (table, _policy)
target_fragment = {"accept": None}
else:
final_chain = "%s_%s_deny" % (table, _policy)
target_fragment = self._reject_fragment()
expr_fragments = []
if rich_rule:
expr_fragments.append(self._rich_rule_family_fragment(rich_rule.family))
expr_fragments.append(self._rich_rule_destination_fragment(rich_rule.destination))
expr_fragments.append(self._rich_rule_source_fragment(rich_rule.source))
expr_fragments.extend(self._icmp_types_to_nft_fragments(ipv, ict.name))
if rich_rule:
rules.append(self._rich_rule_log(policy, rich_rule, enable, table, expr_fragments))
rules.append(self._rich_rule_audit(policy, rich_rule, enable, table, expr_fragments))
if rich_rule.action:
rules.append(self._rich_rule_action(policy, rich_rule, enable, table, expr_fragments))
else:
chain_suffix = self._rich_rule_chain_suffix(rich_rule)
rule = {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s_%s" % (table, _policy, chain_suffix),
"expr": expr_fragments + [self._reject_fragment()]}
rule.update(self._rich_rule_priority_fragment(rich_rule))
rules.append({add_del: {"rule": rule}})
else:
if self._fw.get_log_denied() != "off" and not self._fw.policy.query_icmp_block_inversion(policy):
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": final_chain,
"expr": (expr_fragments +
[self._pkttype_match_fragment(self._fw.get_log_denied()),
{"log": {"prefix": "\"%s_%s_ICMP_BLOCK: \"" % (table, policy)}}])}}})
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": final_chain,
"expr": expr_fragments + [target_fragment]}}})
return rules
def build_policy_icmp_block_inversion_rules(self, enable, policy):
table = "filter"
_policy = self._fw.policy.policy_base_chain_name(policy, table, POLICY_CHAIN_PREFIX)
rules = []
add_del = { True: "add", False: "delete" }[enable]
if self._fw.policy.query_icmp_block_inversion(policy):
target_fragment = self._reject_fragment()
else:
target_fragment = {"accept": None}
# WARN: The "index" used here must be kept in sync with
# build_policy_chain_rules()
#
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s" % (table, _policy),
"index": 4,
"expr": [self._icmp_match_fragment(),
target_fragment]}}})
if self._fw.get_log_denied() != "off" and self._fw.policy.query_icmp_block_inversion(policy):
rules.append({add_del: {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "%s_%s" % (table, _policy),
"index": 4,
"expr": [self._icmp_match_fragment(),
self._pkttype_match_fragment(self._fw.get_log_denied()),
{"log": {"prefix": "%s_%s_ICMP_BLOCK: " % (table, policy)}}]}}})
return rules
def build_rpfilter_rules(self, log_denied=False):
rules = []
expr_fragments = [{"match": {"left": {"meta": {"key": "nfproto"}},
"op": "==",
"right": "ipv6"}},
{"match": {"left": {"fib": {"flags": ["saddr", "iif", "mark"],
"result": "oif"}},
"op": "==",
"right": False}}]
if log_denied != "off":
expr_fragments.append({"log": {"prefix": "rpfilter_DROP: "}})
expr_fragments.append({"drop": None})
rules.append({"insert": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_PREROUTING",
"expr": expr_fragments}}})
# RHBZ#1058505, RHBZ#1575431 (bug in kernel 4.16-4.17)
rules.append({"insert": {"rule": {"family": "inet",
"table": TABLE_NAME,
"chain": "filter_PREROUTING",
"expr": [{"match": {"left": {"payload": {"protocol": "icmpv6",
"field": "type"}},
"op": "==",
"right": {"set": ["nd-router-advert", "nd-neighbor-solicit"]}}},
{"accept": None}]}}})
return rules
def build_rfc3964_ipv4_rules(self):
daddr_set = ["::0.0.0.0/96", # IPv4 compatible
"::ffff:0.0.0.0/96", # IPv4 mapped
"2002:0000::/24", # 0.0.0.0/8 (the system has no address assigned yet)
"2002:0a00::/24", # 10.0.0.0/8 (private)
"2002:7f00::/24", # 127.0.0.0/8 (loopback)
"2002:ac10::/28", # 172.16.0.0/12 (private)
"2002:c0a8::/32", # 192.168.0.0/16 (private)
"2002:a9fe::/32", # 169.254.0.0/16 (IANA Assigned