| Server IP : 170.10.162.208 / Your IP : 216.73.216.181 Web Server : LiteSpeed System : Linux altar19.supremepanel19.com 4.18.0-553.69.1.lve.el8.x86_64 #1 SMP Wed Aug 13 19:53:59 UTC 2025 x86_64 User : deltahospital ( 1806) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /home/deltahospital/.cagefs/tmp/ |
Upload File : |
{
"default_version": "10",
"selector_enabled": true
}alt-nodejs6
alt-nodejs8
alt-nodejs9
alt-nodejs10
alt-nodejs11
alt-nodejs12
alt-nodejs14
alt-nodejs16
{
"default_version": "3.10",
"selector_enabled": true
}alt-python39
alt-python33
alt-python35
alt-python34
alt-python37
alt-python36
alt-python310
alt-python27
alt-python38
{"ui_config": {"inodeLimits": {"showUserInodesUsage": true}, "uiSettings": {"hideRubyApp": false, "hideLVEUserStat": false, "hidePythonApp": false, "hideNodeJsApp": false, "hidePHPextensions": false, "hideDomainsTab": false, "hidePhpApp": false, "hideXrayApp": false, "hideAccelerateWPApp": false}}}#!/bin/bash
if [[ x"${BASH_SOURCE[0]}" == x"$0" ]]; then
echo "'activate' script should be sourced, not run directly"
exit 1;
fi
CWD=$(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd)
NEW_VIRTUAL_ENV_PATH="${CWD%/bin}"
deactivate () {
if [[ x"${BASH_SOURCE[0]}" == x"$0" ]]; then
echo "'deactivate' script should be sourced, not run directly"
exit 1;
fi
# Only restore from backup variables if they are set
# But include the case when they are set to be empty
if [[ ${BKP_PATH+"is_set"} == "is_set" ]]; then
PATH="$BKP_PATH"
export PATH
fi
if [[ ${BKP_NODE_PATH+"is_set"} == "is_set" ]]; then
NODE_PATH="$BKP_NODE_PATH"
export NODE_PATH
fi
if [[ ! -z $BKP_PS1 ]]; then
PS1="$BKP_PS1"
export PS1
fi
unset -v BKP_PATH
unset -v BKP_NODE_PATH
unset -v BKP_PS1
unset -v CL_VIRTUAL_ENV
unset -v CL_APP_ROOT
unset -v CL_NODEHOME
unset -v CL_NODEJS_VERSION
if [ ! "${1-}" = "nondestructive" ] ; then
# Self destruct!
unset -f deactivate
fi
}
activate () {
CL_VIRTUAL_ENV="${NEW_VIRTUAL_ENV_PATH}"
CL_NODEJS_VERSION="$(echo "${CL_VIRTUAL_ENV}" | awk -F '/' '{print $NF}')"
CL_APP_ROOT="${CL_VIRTUAL_ENV#$HOME/nodevenv/}" # cut $HOME/nodevenv/
CL_APP_ROOT="${CL_APP_ROOT%/$CL_NODEJS_VERSION}" # cut nodejs version
CL_NODEHOME="/opt/alt/alt-nodejs${CL_NODEJS_VERSION}/root"
BKP_NODE_PATH="$NODE_PATH"
NODE_PATH="$CL_VIRTUAL_ENV/lib/node_modules:$CL_NODEHOME/lib/node_modules:$CL_NODEHOME/lib64/node_modules:$NODE_PATH"
BKP_PATH="$PATH"
PATH="$CL_VIRTUAL_ENV/bin:$CL_NODEHOME/usr/bin:$CL_VIRTUAL_ENV/lib/bin/:$PATH"
if [[ -z "$CL_VIRTUAL_ENV_DISABLE_PROMPT" ]] ; then
BKP_PS1="$PS1"
PS1="[${CL_APP_ROOT} ($CL_NODEJS_VERSION)] $PS1"
fi
export BKP_PS1
export BKP_PATH
export BKP_NODE_PATH
export PS1
export CL_VIRTUAL_ENV
export CL_APP_ROOT
export CL_NODEHOME
export NODE_PATH
export PATH
}
# compare current virtual environment (that is stored in CL_VIRTUAL_ENV) path
# to the NEW_VIRTUAL_ENV_PATH, that is the path of the new environment we may enter too
# just do nothing if paths are equal
if [ "${CL_VIRTUAL_ENV}" != "${NEW_VIRTUAL_ENV_PATH}" ]; then
deactivate nondestructive
activate
fi
# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import json
import logging
import subprocess
import os
import sys
from libcloudlinux import (
CloudlinuxCliBase, LVEMANAGER_PLUGIN_NAMES, DEFAULT_PLUGIN_NAME, PASSENGER_DEPEND_PLUGINS,
AllLimitStrategy, NoLimitStrategy, LimitStrategyHeavy, NoCagefsStrategy, LimitStrategyBase, ConfigLimitValue,
)
from clselector.clpassenger_detectlib import is_clpassenger_active
from clcommon import ClPwd
from clcommon.utils import is_litespeed_running
from clcommon.lib.cledition import is_cl_solo_edition
from cldetectlib import get_param_from_file
from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported
CONFIG = '/etc/sysconfig/cloudlinux'
SMART_ADVICE_USER_CLI = '/opt/alt/php-xray/cl-smart-advice-user'
PERCENTS_STATS_MODE_FLAG = '/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag'
# NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly
# because of stderr redirection 2>&1
# so it is MUST be silent(NOTSET) in normal situation
# also it is not possible to use file logger here - script works inside the cagefs with user's rights
logger = logging.getLogger(__name__)
logger.setLevel(logging.NOTSET)
init_formatter = logging.Formatter('[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s')
cagefs_formatter = logging.Formatter('{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s')
h = logging.StreamHandler()
h.setFormatter(init_formatter)
logger.addHandler(h)
logger.debug('cli start')
class CloudlinuxCliUser(CloudlinuxCliBase):
limit_strategy: LimitStrategyBase
def __init__(self):
self.web_resource_limit_mode = ConfigLimitValue.ALL
limit_mode = get_param_from_file(CONFIG, 'web_resource_limit_mode', '=', ConfigLimitValue.ALL.value)
self.web_resource_limit_mode = ConfigLimitValue(limit_mode)
super(CloudlinuxCliUser, self).__init__()
self.command_methods.update({
'spa-get-domains': self.spa_user_domains,
'spa-get-homedir': self.spa_user_homedir,
'cloudlinux-snapshots': self.cl_snapshots,
'spa-get-user-info': self.spa_get_user_info
})
def __init_limit_strategy(self):
"""
Set default strategy from the `CONFIG` values
"""
if self.skip_cagefs_check:
logger.handlers[0].setFormatter(cagefs_formatter) # update log format to easier log review
# we cannot use cagefs when it is not available
if not is_panel_feature_supported(Feature.CAGEFS):
self.limit_strategy = NoCagefsStrategy()
else:
self.limit_strategy = {
ConfigLimitValue.ALL: AllLimitStrategy,
ConfigLimitValue.HEAVY: LimitStrategyHeavy,
ConfigLimitValue.UNLIMITED: NoLimitStrategy,
}.get(self.web_resource_limit_mode, AllLimitStrategy)()
logger.debug(
f'Limits strategy inited as {self.limit_strategy.__class__}'
f'\n\tBecause of:'
f'\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}'
)
def set_limit_strategy(self, strategy: LimitStrategyBase):
logger.debug(f'Limit strategy is explicitly set to {strategy.__class__}')
self.limit_strategy = strategy
def __check_exclusive_commands(self):
"""
Check is command currently run without cagefs; commands list is taken from Spa.php `processRequest()`
This function should be removed in the same task as the `LimitStrategyNoCagefs`
"""
data = self.request_data
if data.get('params', {}).get('interpreter') == 'php' or data.get('command') in {
'cloudlinux-statistics',
'cloudlinux-quota',
'cloudlinux-top',
'cloudlinux-snapshots',
'cloudlinux-charts',
'cloudlinux-statsnotifier',
'spa-get-user-info',
'cloudlinux-awp-user',
'cloudlinux-xray-user-manager',
'spa-get-domains',
'cpanel-api',
'cl-smart-advice-user',
}:
logger.debug('Executable command found in the exclusive list')
self.set_limit_strategy(NoCagefsStrategy())
def drop_permission(self):
"""
Drop permission to users, if owner of script is user
:return:
"""
logger.debug(
'drop permissions start'
f'\n\targv is: {sys.argv}'
f'\n\trequest data is: {self.request_data}'
)
self.__init_limit_strategy()
data = self.request_data
if data['owner'] != 'user':
self.exit_with_error("User not allowed")
super(CloudlinuxCliUser, self).drop_permission()
args = self.prepair_params_for_command()
logger.debug(f'prepared args is: {args}')
if data.get('command'):
if self.skip_cagefs_check:
logger.debug('cagefs skipped: --skip-cagefs-check arg found')
else:
self.__check_exclusive_commands()
# if rc is None - script won't enter the cagefs
# otherwise - command is executed in the cagefs
rc = self.limit_strategy.execute(data['command'], args, self.request_data)
if rc is not None:
logger.debug(f'command executed inside of the cagefs with rc: {rc}')
sys.exit(rc)
else:
logger.debug(f'cagefs skipped: strategy is {self.limit_strategy.__class__}')
# skip checking plugin availability on spa-get-user-info
if data.get('command') != 'spa-get-user-info':
self.check_plugin_availability()
logger.debug('drop permissons end')
def spa_user_domains(self):
print(json.dumps({"result": "success", "list": self.get_user_domains()}))
sys.exit(0)
def spa_user_homedir(self):
print(json.dumps({"result": "success", "homedir": self.get_user_homedir()}))
sys.exit(0)
def spa_get_user_info(self):
try:
print(json.dumps(
{
"result": "success",
"domains": self.get_user_domains(),
"homedir": self.get_user_homedir(),
"is_litespeed_running": is_litespeed_running(),
"is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True),
"smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI),
"is_lve_supported": is_panel_feature_supported(Feature.LVE),
"user_stats_mode": self.get_stats_mode(),
"server_ip": self.get_server_ip()
}))
except:
self.exit_with_error('Module unavailable')
sys.exit(0)
def get_user_domains(self):
try:
from clcommon.cpapi import userdomains
except:
self.exit_with_error('Module unavailable')
return [x[0] for x in userdomains(self.user_info['username'])]
def get_stats_mode(self):
if os.path.isfile(PERCENTS_STATS_MODE_FLAG):
return 'percent'
return 'default'
def get_user_homedir(self):
try:
pwdir = ClPwd().get_homedir(self.user_info['username'])
return pwdir + "/"
except KeyError:
self.exit_with_error('No such user')
def cl_snapshots(self):
list_to_request = self.prepair_params_for_command()
try:
output = self.run_util('/usr/sbin/lve-read-snapshot', *list_to_request)
except subprocess.CalledProcessError as processError:
output = processError.output
try:
result = json.loads(output)
except:
self.exit_with_error(output)
return
self.exit_with_success({'data': result['data']})
sys.exit(0)
def check_plugin_availability(self):
plugin_names = {
'nodejs_selector': 'Node.js Selector',
'python_selector': 'Python Selector',
}
selector_enabled = True
manager = None
try:
if self.current_plugin_name == 'nodejs_selector':
from clselect.clselectnodejs.node_manager import NodeManager
manager = NodeManager()
if self.current_plugin_name == 'python_selector':
from clselect.clselectpython.python_manager import PythonManager
manager = PythonManager()
if manager:
selector_enabled = manager.selector_enabled
except:
selector_enabled = False
if not selector_enabled:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': plugin_names.get(self.current_plugin_name, 'Plugin')},
icon='disabled')
plugin_available_checker = {
'nodejs_selector': self._plugin_available_nodejs,
'python_selector': self._plugin_available_python,
'php_selector': self._plugin_available_php,
'resource_usage': self._plugin_available_resource_usage,
}.get(self.current_plugin_name)
if plugin_available_checker:
plugin_available = plugin_available_checker()
else:
plugin_available = True
if not is_clpassenger_active() and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_passenger',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
if not plugin_available:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
def _plugin_available_nodejs(self):
try:
from clselect.clselectnodejs.node_manager import NodeManager
manager = NodeManager()
if not manager.selector_enabled or not is_clpassenger_active():
return False
except:
return False
return True
def _plugin_available_python(self):
try:
from clselect.clselectpython.python_manager import PythonManager
manager = PythonManager()
if not manager.selector_enabled or not is_clpassenger_active():
return False
except:
return False
return True
def _plugin_available_php(self):
try:
from clselect.clselectphp.php_manager import PhpManager
manager = PhpManager()
if not manager.selector_enabled:
return False
except:
return False
return True
def _plugin_available_resource_usage(self):
return True
#!/opt/cloudlinux/venv/bin/python3 -bb
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import os
import sys
import getpass
from future.utils import iteritems
from clselect.clselectnodejs.apps_manager import ApplicationsManager as NodeJsAppsManager
from clselect.clselectpython.apps_manager import ApplicationsManager as PythonAppsManager, get_venv_rel_path
from clselect.utils import get_using_realpath_keys
def get_app_name(interpreter):
"""
Get application name via CL_APP_ROOT variable
:param interpreter: interpreter name
:return: str application name
"""
if os.environ.get('CL_APP_ROOT'):
return os.environ['CL_APP_ROOT']
elif interpreter == 'python':
abs_venv_path = os.environ['VIRTUAL_ENV']
user_home = os.environ['HOME']
# /home/<username>/virtualenv/<name>/<version> ->
# /virtualenv/<name>
vevn_rel_path = os.path.dirname(abs_venv_path).replace(user_home + '/', '', 1)
# if VENV_REL_PATH contains _ we cannot
# clearly define app_root and should guess
if '_' in vevn_rel_path:
username = getpass.getuser()
# in python CL_APP_ROOT is not set, we must guess by env path
for app_root in PythonAppsManager().get_user_config_data(username):
_, rel_path = get_venv_rel_path(username, app_root)
if rel_path == vevn_rel_path:
return app_root
return None
else:
return vevn_rel_path.replace('virtualenv/', '', 1)
else:
raise NotImplementedError(
'I don\'t know how to get app_root for %s' % interpreter)
def get_env_vars(_app_name, interpreter):
"""
Get environment variables from user config for given application name
:param _app_name: application name
:param interpreter: interpreter name
:return: dict {ENV_VAR_NAME: VALUE}
"""
_env_vars = {}
username = getpass.getuser()
try:
full_app_config = get_app_full_conf(username, _app_name, interpreter)
if interpreter == 'nodejs':
_env_vars['NODE_ENV'] = full_app_config['app_mode']
_env_vars.update(full_app_config['env_vars'])
except KeyError:
pass
return _env_vars
def set_env_vars(dict_env_vars):
"""
Print to stdout bash strings with environment variables
:param dict_env_vars: dict with environment variables
:return: None
"""
for key, var in iteritems(dict_env_vars):
print('export {}="{}"'.format(key, var))
def is_root():
return os.geteuid() == 0
def get_app_full_conf(user, app, interpreter):
if interpreter == 'nodejs':
manager = NodeJsAppsManager()
elif interpreter == 'python':
manager = PythonAppsManager()
else:
raise NotImplementedError()
full_user_config = manager.get_user_config_data(user)
if not full_user_config:
print("User config was not found or empty")
sys.exit(0)
return get_using_realpath_keys(user, app, full_user_config)
if __name__ == "__main__":
if is_root():
print("This program is not intended to be run as root.")
sys.exit(1)
args = sys.argv
if len(args) < 2:
print("Interpreter is not passed.")
sys.exit(1)
app_name = get_app_name(sys.argv[1])
if app_name is None:
print("Unknown application.")
sys.exit(1)
env_vars = get_env_vars(app_name, sys.argv[1])
set_env_vars(env_vars)
# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
from __future__ import print_function
from lvemanager.helpers import exit_with_error, run_command
import json
from abc import abstractmethod
from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError
import sys
OWNER_ADMIN = 'admin'
OWNER_USER = 'user'
UAPI_IGNORED_ERRORS = ["The event UAPI::LangPHP::php_set_vhost_versions was handled successfully."]
def get_cpanel_api_class(owner):
"""
Factory method that returns
certain class depending on
owner's type
:param owner: Can be 'admin' or 'user'
:return:
"""
if owner == OWNER_ADMIN:
return CPanelAdminApi()
return CPanelUserApi()
class CPanelApi(object):
"""
Abstract method that defines abstract methods
that must be implemented and common methods of
the derived classes
"""
RETURN_JSON = '--output=json'
ALLOWED_OPERATIONS = {}
@abstractmethod
def check_operation_allowed(self, called_method):
"""
Checks whether operation is allowed to perform
:param called_method:
:return:
"""
raise NotImplementedError('Method check_operation_allowed must be implemented!')
@abstractmethod
def prepare_running_command(self, called_method, params, return_json):
"""
Depending on the passed arguments,
the method builds running command
"""
raise NotImplementedError('Method prepare_running_command must be implemented!')
@abstractmethod
def parse_result(self, called_method, result):
"""
Some results are parsed before sending back
to the called. This method parses it and returns
transformed result.
"""
raise NotImplementedError('Method parse_result must be implemented!')
@staticmethod
def check_method_allowed(method_name, methods):
"""
CPanel API contains several methods and
not all of them if supported by this utility.
"""
for method_object in methods:
if method_name == method_object['method']:
return True
return False
def check_for_errors(self, result):
"""
Checks whether CPanel API returned
error or not
:param result:
:return:
"""
raise NotImplementedError('Method check_for_errors must be implemented!')
@staticmethod
def get_method_parser(method_name, methods):
"""
Some result must be parsed before
they are sent back to the caller. This
method extracts the method that must be executed
upon the result to transform it.
"""
for method_object in methods:
if method_name == method_object['method']:
return method_object.get('parser', None)
def get_ignore_errors(self, method_name, methods):
"""
return ignore error flag for method
:return:
"""
for method_object in methods:
if method_name == method_object['method']:
return method_object.get('ignore_errors', None)
@staticmethod
def parse_vhost_versions(result):
"""
Method parses the result of the method 'parse_vhost_versions'.
The host will be inherited when its field 'sys_default'
inside 'php_version_source' is equal to 1.
"""
parsed_result = []
for r in result:
parsed_result.append({
'version': r.get('version'),
'host': r.get('vhost'),
'php_fpm': True if r.get('php_fpm') else False,
'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False
})
return parsed_result
def run(self, called_method, params, return_json=True):
"""
Default method which first checks whether operation
is allowed then if allowed runs prepared command.
After receiving the result, it will call parse_result method
to transform result into desired format.
"""
if self.check_operation_allowed(called_method):
prepared_command = self.prepare_running_command(called_method, params, return_json)
code, output, std_err = run_command(prepared_command, return_full_output=True)
if code != 0:
exit_with_error(std_err or 'output of the command: %s\n%s' % (prepared_command, output))
try:
result = json.loads(output)
return self.parse_result(called_method, result)
except ValueError as e:
exit_with_error(e)
else:
exit_with_error('Not allowed operation: {}'.format(called_method))
class CPanelAdminApi(CPanelApi):
COMMAND = '/usr/local/cpanel/bin/whmapi1'
ALLOWED_OPERATIONS = [
{
'method': 'php_get_vhost_versions',
'parser': lambda result: CPanelAdminApi.parse_vhost_versions(result)
},
{'method': 'php_get_system_default_version'},
{'method': 'php_set_vhost_versions'},
{'method': 'php_get_installed_versions'}
]
def check_operation_allowed(self, called_method):
return self.check_method_allowed(called_method, self.ALLOWED_OPERATIONS)
def prepare_running_command(self, called_method, params, return_json):
transformed_params = {}
for param in params:
key, value = param.split('=')
transformed_params[key] = value
return transformed_params
def parse_result(self, called_method, result):
parser = self.get_method_parser(called_method, self.ALLOWED_OPERATIONS)
if parser is not None:
return parser(result)
return result
@staticmethod
def parse_vhost_versions(result):
parsed_result = []
for r in result['versions']:
parsed_result.append({
'version': r.get('version'),
'host': r.get('vhost'),
'user': r.get('account'),
'php_fpm': True if r.get('php_fpm') else False,
'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False
})
return parsed_result
def run(self, called_method, params, return_json=True):
params = self.prepare_running_command(called_method, params, return_json)
if self.check_operation_allowed(called_method):
try:
return self.parse_result(called_method, WhmApiRequest(called_method).with_arguments(**params).call())
except WhmApiError as e:
print(e)
sys.exit(1)
else:
exit_with_error('Not allowed operation: {}'.format(called_method))
class CPanelUserApi(CPanelApi):
COMMAND = '/usr/bin/uapi'
ALLOWED_OPERATIONS = {
'LangPHP': [
{'method': 'php_set_vhost_versions'},
{
'method': 'php_get_installed_versions',
'ignore_errors': True,
},
{
'method': 'php_get_vhost_versions',
'parser': lambda result: CPanelApi.parse_vhost_versions(result),
'ignore_errors': True,
},
{
'method': 'php_get_system_default_version',
'ignore_errors': True,
}
]
}
def check_class_allowed(self, class_name):
"""
CPanel UAPI requires to pass class name and
only few of classes are supported by this utility.
"""
return class_name in self.ALLOWED_OPERATIONS.keys()
def check_operation_allowed(self, called_method):
"""
Checks whether operation is allowed or not.
By CPanel UAPI supports several operations
but only few of them can be called via this utility.
"""
class_name, method_name = self.extract_class_and_method(called_method)
if self.check_class_allowed(class_name):
return self.check_method_allowed(method_name, self.ALLOWED_OPERATIONS[class_name])
return False
@staticmethod
def extract_class_and_method(called_method):
try:
class_name, method_name = called_method.split('::')
return class_name, method_name
except ValueError:
exit_with_error('Invalid method is passed: {}'.format(called_method))
def prepare_running_command(self, called_method, params, return_json):
class_name, method_name = self.extract_class_and_method(called_method)
params = [self.COMMAND, class_name, method_name] + params
if return_json:
params.append(self.RETURN_JSON)
return params
def parse_result(self, called_method, result):
class_name, method_name = self.extract_class_and_method(called_method)
ignore_errors = self.get_ignore_errors(method_name, self.ALLOWED_OPERATIONS[class_name])
self.check_for_errors(result, ignore_errors)
parser = self.get_method_parser(method_name, self.ALLOWED_OPERATIONS[class_name])
if parser is not None:
return parser(result['result']['data'])
return result['result']['data']
def check_for_errors(self, result, ignore_errors = False):
errors = result['result'].get('errors')
if not errors:
return
for error in errors:
# TODO: The next line is workaround of cPanel issue CPANEL-35122
# see https://support.cpanel.net/hc/en-us/articles/1500004317181-PHP-Selector-change-to-CloudLinux-PHP-version-reports-a-false-error
if error in UAPI_IGNORED_ERRORS:
continue
exit_with_error(' '.join(errors), ignore_errors=ignore_errors)
# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import copy
import sys
import json
import argparse
import socket
import base64
import os
import subprocess
import re
from enum import Enum
from dataclasses import dataclass
from typing import Callable, List, Optional
from past.builtins import basestring, unicode # noqa
from future.utils import iteritems
from clcommon.utils import silence_stdout_until_process_exit, get_cl_version, is_ubuntu
from cllicense import CloudlinuxLicenseLib
from cpanel_api import get_cpanel_api_class
from clcommon.lib.cledition import is_cl_solo_edition, is_container
from clcommon.cpapi import is_hitting_max_accounts_limit, get_main_username_by_uid
LVEMANAGER_PLUGIN_NAMES = {
'python_selector': 'Python Selector',
'nodejs_selector': 'Node.js Selector',
'php_selector': 'PHP Selector',
'resource_usage': 'Resource Usage',
'wpos': 'AccelerateWP'
}
PASSENGER_DEPEND_PLUGINS = ['python_selector', 'nodejs_selector']
DEFAULT_PLUGIN_NAME = 'CloudLinux Manager'
CAGEFS_ENTER_PROXIED_BIN = '/usr/bin/cagefs_enter.proxied'
if not os.path.exists(CAGEFS_ENTER_PROXIED_BIN):
CAGEFS_ENTER_PROXIED_BIN = '/bin/cagefs_enter.proxied'
def is_json(data):
try:
json.loads(data)
return True
except ValueError as error:
return False
class CloudlinuxCliBase(object):
request_data = {}
result = None
available_request_params = [
'owner', 'command', 'method', 'params', 'user_info', 'mockJson', 'attachments', 'plugin_name', 'lang'
]
NOT_FLAGGED_PARAMS = ['config-files', 'content', 'passenger-log-file', 'ignore-list', 'wp-path', 'upgrade-url']
license_is_checked = False
current_plugin_name = ''
licence = CloudlinuxLicenseLib()
def __init__(self):
self.skip_cagefs_check = False
self.user_info = {}
self.parsing_request_data()
self.check_xss()
self.drop_permission()
self.command_methods = {
'spa-ping': self.spa_ping,
'cloudlinux-top': self.cl_top,
'cloudlinux-selector': self.cl_selector,
'cloudlinux-statistics': self.cl_statistics,
'cloudlinux-charts': self.cl_chart,
'cloudlinux-quota': self.cl_quota,
'cpanel-api': self.cpanel_api,
'cloudlinux-xray-user-manager': self.cl_xray_user_manager,
'cloudlinux-statsnotifier': self.cl_statsnotifier,
'cloudlinux-awp-user': self.cloudlinux_awp_user,
'cl-smart-advice-user': self.cl_smart_advice_user,
'cl-install-plugin': self.cl_install_plugin
}
def check_xss(self):
for key in self.request_data.keys():
if key not in self.available_request_params:
self.exit_with_error('BAD REQUEST 1:' + key)
for name, val in iteritems(self.request_data):
if isinstance(val, dict): # if post key is "params"
for key, inner_value in iteritems(val):
self.check_param_key(key)
if self.request_data['command'] == 'cloudlinux-packages' \
and name == 'params' \
and key == 'package':
self.request_data[name][key] = self.escape_param_value(inner_value)
elif self.request_data['command'] == 'cloudlinux-support':
pass
elif self.request_data['command'] == 'cloudlinux-selector' \
and name == 'params' \
and key == 'options':
pass
elif self.request_data['command'] == 'lvectl' \
and name == 'params' \
and key == 'stdin':
pass
elif self.request_data['command'] == 'cloudlinux-selector' \
and name == 'params' \
and key == 'env-vars':
pass
elif self.request_data['command'] == 'cloudlinux-xray-manager' \
and name == 'params' \
and key == 'url':
pass
elif self.request_data['command'] == 'cloudlinux-xray-user-manager' \
and name == 'params' \
and key == 'url':
pass
elif self.request_data['command'] == 'wmt-api' \
and name == 'params' \
and key == 'config-change':
pass
elif self.request_data['command'] == 'cloudlinux-xray-manager' \
and name == 'params' \
and key == 'email':
pass
elif self.request_data['command'] == 'cloudlinux-awp-admin' \
and name == 'params' \
and key == 'upgrade-url':
pass
else:
self.check_param_value(inner_value)
else:
self.check_param_value(val)
def get_env(self):
"""
Get env for subprocess call
"""
env_copy = os.environ.copy()
if self.request_data.get('lang'):
lang = self.request_data.get('lang')
if not re.match(r'^[a-z]{2}$', lang):
lang = 'en'
env_copy['LC_ALL'] = lang
return env_copy
def get_server_ip(self):
"""
Get the server's IP address.
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception as e:
return None
def check_param_key(self, key):
if not re.search('^[\w\-]+$', key):
self.exit_with_error('BAD REQUEST 2')
def check_param_value(self, val):
if isinstance(val, basestring):
if re.search('[`\|\$;&\n]', val, re.M):
self.exit_with_error('BAD REQUEST 3')
def escape_param_value(self, val):
chars = "\\\"\'"
for c in chars:
val = val.replace(c, "\\" + c)
return val
def main(self):
command = self.request_data['command']
endpoint = self.command_methods.get(command)
allowed_methods = ['cloudlinux-license', 'external-info', 'spa-get-user-info'] # not requires license check
if endpoint:
if not self.license_is_checked and command not in allowed_methods:
self.check_license()
if 'mockJson' in self.request_data:
self.spa_mock(self.request_data['mockJson'])
endpoint()
else:
if command:
self.exit_with_error("No such module " + command)
else:
self.exit_with_error("Command not defined")
def parsing_request_data(self):
"""
parsing entry data, encode it from base64 to dictionary
:return:
"""
parser = argparse.ArgumentParser()
parser.add_argument('--data')
parser.add_argument('--skip-cagefs-check', action='store_true', default=False)
try:
arguments = parser.parse_args()
except:
self.exit_with_error("Unknown param in request")
if arguments.data:
data_in_base64 = arguments.data
data_in_json = base64.b64decode(data_in_base64).decode("utf-8")
try:
self.request_data = json.loads(data_in_json)
self.skip_cagefs_check = arguments.skip_cagefs_check
except ValueError:
self.exit_with_error("Need json-array")
self.user_info = self.get_user_info()
self.define_current_plugin()
else:
self.exit_with_error("No --data param in request")
def get_user_info(self):
user_info = self.request_data.get('user_info') or {}
if self.request_data['owner'] == 'user' and any(value is None for value in user_info.values()):
euid = os.geteuid()
username = get_main_username_by_uid(euid)
user_info = {'username': username, 'lve-id': euid}
return user_info
def cl_top(self):
# This imports from other package (cagefs), so we turn off pylint import checker for this line
from lvestats.lib.info.cloudlinux_top import CloudLinuxTop #pylint: disable=E0401
import lvestats.lib.config as config #pylint: disable=E0401
list_to_request = self.prepair_params_for_command()
result = ''
try:
result, exitcode = CloudLinuxTop(config.read_config()).main(*list_to_request)
except config.ConfigError as ce:
ce.log_and_exit()
self.exit_with_error(str(ce))
if self.request_data.get('owner') == 'user':
json_result = {}
try:
json_result = json.loads(result)
except:
self.exit_with_error(result)
if json_result.get('result') != 'success':
self.exit_with_error(json_result.get('result'), json_result.get('context'), ignore_errors=True)
print(result)
silence_stdout_until_process_exit()
sys.exit(exitcode)
def cl_quota(self):
list_to_request = self.prepair_params_for_command()
result = self.run_util('/usr/bin/cl-quota', *list_to_request, ignore_errors=True)
print(result)
def cl_xray_user_manager(self):
list_to_request = self.prepair_params_for_command()
list_to_request.remove("--json")
result = self.run_util('/opt/alt/php-xray/cloudlinux-xray-user-manager', *list_to_request, ignore_errors=False)
print(result)
def cl_smart_advice_user(self):
cli_command = '/opt/alt/php-xray/cl-smart-advice-user'
list_to_request = self.prepair_params_for_command(with_json=False)
# Workaround to run the command in background
if '--async' in list_to_request:
subprocess.Popen([cli_command, *list_to_request], stdin=subprocess.PIPE,stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
self.exit_with_success()
result = self.run_util(cli_command, *list_to_request, ignore_errors=False)
print(result)
def cpanel_api(self):
owner = self.request_data.get('owner')
method = self.request_data.pop('method')
list_to_request = self.prepair_params_for_command(with_json=False, add_dash=False)
cpanel_api = get_cpanel_api_class(owner)
self.exit_with_success({'data': cpanel_api.run(method, list_to_request)})
def cl_chart(self):
list_to_request = self.prepair_params_for_command()
try:
list_to_request.remove("--json")
except ValueError:
pass
for param in list_to_request:
if param.startswith('--output'):
self.exit_with_error('BAD REQUEST 2')
list_to_request.insert(0, '/usr/sbin/lvechart')
response = subprocess.check_output(list_to_request, shell=False, text=True)
print(json.dumps({"result": "success", "chart": response}))
silence_stdout_until_process_exit()
sys.exit(0)
def drop_permission(self):
"""
Drop permission to users, if owner of script is user
:return:
"""
data = self.request_data
if data['owner'] in ['reseller', 'user'] and\
('lve-id' not in self.user_info or
'username' not in self.user_info):
self.exit_with_error("User id does not specified")
def prepair_params_for_command(self, with_json=True, escaped_strings=False, add_dash=True):
"""
Method that converts given dict of parameters
into list of strings that should be passed
as arguments command-line application
:param with_json: add --json argument
:param escaped_strings: ONLY FOR BACKWARDS COMPATIBILITY!
SHOULD BE False FOR ALL NEW METHODS!
:param add_dash: if we need to add dashes to params
:return:
"""
value_template = "--{0}={1}" if add_dash else "{0}={1}"
data = copy.deepcopy(self.request_data)
list_to_request = []
if "method" in data:
for method in data["method"].split(' '):
list_to_request.append(method)
if "params" not in data:
data['params'] = {}
if "json" not in data['params'] and with_json:
data['params']['json'] = ''
for param, value in iteritems(data['params']):
if param != 'additional-params':
# TODO: looks like we can remove option escaped_strings
# and always use value.encode('utf-8') here
# same goal may be reached using utils.byteify(json.loads(...))
# but to do that, we need some tests covering unicode params
# (especially for cloudlinux-packages)
# unfortunately, we do not have one ;(
# THIS IS NEEDED ONLY FOR CL-PACKAGES UTILITY
if value and escaped_strings is True:
list_to_request.append(value_template.format(param, value.encode('unicode-escape').decode()))
elif (value or param in self.NOT_FLAGGED_PARAMS) and escaped_strings is False:
list_to_request.append(value_template.format(param, value))
else:
list_to_request.append("--{0}".format(param))
if self.request_data['owner'] == 'reseller':
list_to_request.append('--for-reseller={0}'.format(self.user_info['username']))
if 'additional-params' in data['params'] \
and data['params']['additional-params'] != '':
list_to_request.append("--")
for param in data['params']['additional-params'].split():
list_to_request.append("{0}".format(param))
return list_to_request
def is_edition_migration_available(self):
# check if edition migration is supported
return os.path.isfile('/usr/sbin/clncheck')
def update_license(self):
# Register by broken license
with open(os.devnull, 'w') as devnull:
clnreg_cmd = ['/usr/sbin/clnreg_ks', '--force']
if self.is_edition_migration_available():
clnreg_cmd.append('--migrate-silently')
subprocess.call(clnreg_cmd, stderr=devnull, stdout=devnull, shell=False)
subprocess.call(['/usr/bin/cldetect', '--update-license'], stderr=devnull, stdout=devnull, shell=False)
self.check_license(False)
def check_license(self, with_recovery=True):
if not self.kernel_is_supported():
if self.request_data['owner'] in ['reseller']:
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
elif self.request_data['owner'] in ['admin']:
self.exit_with_error('Kernel is not supported')
if is_hitting_max_accounts_limit():
if self.request_data['owner'] == 'admin':
self.exit_with_error('ERROR.hitting_max_accounts_limit')
if self.request_data['owner'] == 'user':
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
if not self.licence.get_license_status():
if self.request_data['owner'] in ['reseller', 'user']:
interpreter = 'nodejs'
if self.request_data.get('params') \
and self.request_data['params'].get('interpreter'):
interpreter = self.request_data['params']['interpreter']
pluginNames = {
'reseller': 'CloudLinux Manager',
'user': {'python': 'Python Selector', 'nodejs':'Node.js Selector'}
.get(interpreter, 'Node.js Selector')
}
self.exit_with_error(
code=503,
error_id='ERROR.not_available_plugin',
context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)},
icon='disabled')
else:
if with_recovery:
self.update_license()
else:
self.exit_with_error('License is not valid')
else:
self.license_is_checked = True
def exit_with_error(self, error_string='', context=None,
code=None, error_id=None, icon=None, ignore_errors=False):
result = {"result": error_string}
if context:
result['context'] = context
if code:
result['code'] = code
if error_id:
result['error_id'] = error_id
if icon:
result['icon'] = icon
if ignore_errors:
result['ignore'] = ignore_errors
print(json.dumps(result))
sys.exit(1)
def exit_with_success(self, response=None):
data = copy.deepcopy(response) if response else {}
data['result'] = 'success'
print(json.dumps(data))
sys.exit(0)
def cl_statistics(self):
# This imports from other package (cagefs), so we turn off pylint import checker for this line
from lvestats.lib.cloudlinux_statistics import main #pylint: disable=E0401
import lvestats.lib.config as config #pylint: disable=E0401
from lvestats.lib.dbengine import make_db_engine #pylint: disable=E0401
list_to_request = self.prepair_params_for_command()
try:
cnf = config.read_config()
dbengine = make_db_engine(cnf)
main(dbengine, argv_=list_to_request,server_id=cnf.get('server_id', 'localhost'))
silence_stdout_until_process_exit()
sys.exit(0)
except config.ConfigError as ce:
ce.log_and_exit()
self.exit_with_error(ce)
def spa_mock(self, file):
file_path = '/usr/share/l.v.e-manager/spa/src/jsons/%s.json' % (file)
# check if passed file param doesn't use relative path. E.g.: '../../file'
if os.path.realpath(file_path) != file_path:
self.exit_with_error('BAD REQUEST 3')
with open(file_path, 'r') as f:
print(f.read())
sys.exit(0)
def get_lve_version(self):
try:
ver = subprocess.check_output(
'cat /proc/lve/list | grep -Po \'^\d{1,2}:\'',
shell=True, executable='/bin/bash', text=True
).strip()
return int(ver[:-1])
except:
return 0
def get_cloudlinux_version(self):
return subprocess.check_output(
'uname -r | grep -Po \'el\d\w?\'',
shell=True, executable='/bin/bash', text=True
).strip()
# Common methods
def spa_ping(self):
self.exit_with_success()
def cl_selector(self):
try:
from clselector.cl_selector import CloudlinuxSelector
except:
self.exit_with_error('Module unavailable')
if self.user_info.get('username') and 'interpreter' in self.request_data['params']\
and self.request_data['params']['interpreter'] == 'php':
self.check_php_selector_user_availablility()
list_to_request = self.prepair_params_for_command()
cll = CloudlinuxSelector()
cll.run(list_to_request)
def check_php_selector_user_availablility(self):
"""
Additional check only for php selector
:return:
"""
try:
LIBDIR = '/usr/share/cagefs'
sys.path.append(LIBDIR)
import cagefsctl
if not cagefsctl.cagefs_is_enabled or \
not cagefsctl.is_user_enabled(self.user_info['username']):
raise RuntimeError('Cagefs is disabled or missing')
except (ImportError, RuntimeError):
self.exit_with_error(
code=503,
error_id='ERROR.cagefsDisabled',
)
from clselect.clselectexcept import BaseClSelectException
try:
from clselect import ClSelect
ClSelect.check_multiphp_system_default_version()
except (BaseClSelectException):
self.exit_with_error(
code=503,
error_id='ERROR.systemVersionAltPHP',
)
def define_current_plugin(self):
self.current_plugin_name = self.request_data.get('plugin_name')
def is_error_response_default(self, json_result):
return json_result.get('result') != 'success' and json_result.get('success') != 1
def run_util(self, name, *args, **kwargs):
command = [name] + list(args)
error_checker = kwargs.get('error_checker', self.is_error_response_default)
try:
p = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=self.get_env())
(result, err) = p.communicate(kwargs.pop('stdin', None))
is_error = p.returncode != 0 or not is_json(result)
if not is_error:
json_result = json.loads(result)
is_error = error_checker(json_result)
if is_error:
result = result + err
if is_json(result):
json_result = json.loads(result)
if json_result.get('message'):
json_result['result'] = json_result.pop('message')
result = json.dumps(json_result)
if kwargs.get("ignore_errors", False):
# Check new result concatenated with error
if is_json(result):
result = json.loads(result)
result['ignore'] = True
result = json.dumps(result)
else:
result = self.ignored_error_message(result)
print(result)
exit(1)
return result
except Exception as e:
self.exit_with_error("Can't run %(command)s", context={'command': ' '.join(command)}, ignore_errors=True)
def ignored_error_message(self, message):
return json.dumps({
"result": message,
"ignore": True
})
def kernel_is_supported(self):
try:
if is_container():
return True
if is_cl_solo_edition(skip_jwt_check=True):
# Ubuntu uses standard kernel,
# CL9 uses Alma kernel which doesn't have 'lve' in its name
if is_ubuntu() or get_cl_version() == 'cl9':
return True
uname = subprocess.check_output('uname -r', shell=True, executable='/bin/bash', text=True)
return 'lve' in uname
else:
f = open('/proc/lve/list', 'r')
line = f.readline()
f.close()
return bool(line)
except IOError:
return False
def cl_statsnotifier(self):
from lvestats.lib.cloudlinux_statsnotifier import main #pylint: disable=E0401
list_to_request = self.prepair_params_for_command(with_json=True)
exit_code = main(args_=list_to_request) #pylint: disable=E0401
sys.exit(exit_code)
def cloudlinux_awp_user(self):
cli_command = '/usr/bin/cloudlinux-awp-user'
list_to_request = self.prepair_params_for_command(with_json=False)
result = self.run_util(cli_command, *list_to_request, ignore_errors=False)
print(result)
def cl_install_plugin(self):
"""
This method is needed just for dev server to allow work with mocks
"""
self.exit_with_success()
# user cli parts
class CommandType(Enum):
HEAVY = 'heavy'
SIMPLE = 'simple'
class ConfigLimitValue(Enum):
ALL = 'all' # limit all requests
HEAVY = 'heavy' # don't limit white-listed 'simple' requests
UNLIMITED = 'unlimited' # don't limit at all
@classmethod
def _missing_(cls, value):
return cls.ALL
@dataclass
class Rule:
callable: Callable
result: CommandType
class LimitStrategyBase:
"""
Base limits strategy to decide - run incoming request with or without cagefs limits
Strategy execution means that used script (cloudlinux_cli_user.py) will be re-executed with (or not)
additional cagefs flags
"""
cagefs_args: List[str]
def execute(self, command: str, args: List[str], request_data: dict) -> Optional[int]:
full_command = self.get_full_command(command, args, request_data)
p = subprocess.Popen(full_command)
p.communicate()
return p.returncode
def get_full_command(self, command: str, args: List[str], request_data: dict) -> List[str]:
cmd = [*sys.argv, f'--skip-cagefs-check']
return [CAGEFS_ENTER_PROXIED_BIN, *self.cagefs_args, *cmd]
class NoCagefsStrategy(LimitStrategyBase):
"""
Strategy for hardcoded commands, that should always run even without the cagefs with unknown reason
This strategy does not re-executes the script with `cagefs_enter.proxied`, just letting them to finish as is
TODO: LVEMAN-1767
"""
def execute(self, *args, **kwargs) -> Optional[int]:
return None
class AllLimitStrategy(LimitStrategyBase):
"""
Strategy to limit all commands
"""
cagefs_args = []
class NoLimitStrategy(LimitStrategyBase):
"""
Strategy to don't limit all commands
"""
cagefs_args = ['--no-io-and-memory-limit', '--no-cpu-limit', '--no-max-enter']
class LimitStrategyHeavy(LimitStrategyBase):
"""
Strategy to don't limit whitelisted commands
By default - all commands are HEAVY and will be limited
Add `rules` to mark any command as SIMPLE and run without limits
"""
cagefs_args = []
default_rule = Rule(callable=lambda args: True, result=CommandType.HEAVY)
rules = {
'cloudlinux-selector': [
Rule(callable=lambda args: 'get' in args, result=CommandType.SIMPLE),
Rule(callable=lambda args: 'start' in args, result=CommandType.SIMPLE),
Rule(callable=lambda args: 'restart' in args, result=CommandType.SIMPLE),
Rule(callable=lambda args: 'stop' in args, result=CommandType.SIMPLE),
]
}
def _check_rules(self, command: str, args: List[str]) -> CommandType:
command_type = None
for rule in self.rules.get(command, []) + [self.default_rule]:
if rule.callable(args):
command_type = rule.result
break
if command_type == CommandType.SIMPLE:
self.cagefs_args = ['--no-io-and-memory-limit', '--no-cpu-limit', '--no-max-enter']
else:
self.cagefs_args = []
return command_type
def get_full_command(self, command: str, args: List[str], request_data: dict) -> List[str]:
self._check_rules(command, args)
return super().get_full_command(command, args, request_data)
#!/opt/cloudlinux/venv/bin/python3 -sbb
# coding:utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from cloudlinux_cli_user import CloudlinuxCliUser
if __name__ == "__main__":
cloudlinux_cli = CloudlinuxCliUser()
cloudlinux_cli.main()
#!/bin/bash
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#
if [[ $EUID -eq 0 ]]; then
echo "This program is not intended to be run as root." 1>&2
exit 1
fi
CWD=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source ${CWD}/activate
CL_PYTHON_VERSION="$(echo "${VIRTUAL_ENV}" | awk -F '/' '{print $NF}')"
eval $(${CWD}/set_env_vars.py python)
ABSOLUTE_PATH="${CWD}/python${CL_PYTHON_VERSION}_bin"
exec "${ABSOLUTE_PATH}" "$@"
#!/bin/bash
if [[ $EUID -eq 0 ]]; then
echo "This program is not intended to be run as root." 1>&2
exit 1
fi
CWD=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source ${CWD}/activate
eval $(${CWD}/set_env_vars.py nodejs)
exec "${CL_NODEHOME}/usr/bin/node" "$@"#!/bin/bash
if [[ $EUID -eq 0 ]]; then
echo "This program is not intended to be run as root." 1>&2
exit 1
fi
error_msg="Cloudlinux NodeJS Selector demands to store node modules for application in separate folder \
(virtual environment) pointed by symlink called \"node_modules\". That's why application should not contain \
folder/file with such name in application root"
CWD=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source "${CWD}/activate"
eval $(${CWD}/set_env_vars.py nodejs)
app_node_modules="${HOME}/${CL_APP_ROOT}/node_modules"
venv_node_modules="${CL_VIRTUAL_ENV}/lib/node_modules"
nodejs_npm="$CL_NODEHOME/usr/bin/npm"
# install with its aliases and list with its alias without arguments or +args
if [[ "$@" =~ ^(install|i|add|list|la|ll)$ || "$@" =~ ^(install|i|add|list|la|ll)[[:space:]].*$ ]]; then
# We remove old symlink `~/app_root/node_modules` if it exists
if [[ -L "${app_node_modules}" ]]; then
rm -f "${app_node_modules}" || (echo "Can't remove symlink "${app_node_modules} 1>&2 && exit 1)
# We print error end exit 1 if `~/app_root/node_modules` is dir or file
elif [[ -d "${app_node_modules}" || -f "${app_node_modules}" ]]; then
echo "${error_msg}" 1>&2 && exit 1
fi
# we should create venv/node_modules, https://docs.cloudlinux.com/index.html?link_traversal_protection.html
mkdir -p "${venv_node_modules}"
# Create symlink ~/app_root/node_modules to ~/nodevenv/app_root/int_version/lib/node_modules
ln -fs "${venv_node_modules}" "${app_node_modules}"
ln -sf "${HOME}/${CL_APP_ROOT}/package.json" "${CL_VIRTUAL_ENV}/lib/package.json"
exec "${nodejs_npm}" "$@" --prefix="${CL_VIRTUAL_ENV}/lib"
else
exec "${nodejs_npm}" "$@"
fi#!/opt/cloudlinux/venv/bin/python3 -sbb
# -*- coding: utf-8 -*-
# cloudlinux-selector Utility to check Cloudlinux license
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2022 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import sys
from clselector.cl_selector import CloudlinuxSelector
def main(argv):
"""
Main run function
"""
cll = CloudlinuxSelector()
return cll.run(argv)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))