관리-도구
편집 파일: selector_usage_lib.py
# Copyright (c) Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json # from lxml import etree from copy import deepcopy import exec_command from stat_utils import pretty_version_keys, cpanel_whmapi, dump_loaded_modules class SelectorStatException(Exception): """ Inner exception class """ pass def cagefs_enabled(): """ Check status of cagefs - installed & enabled If not - selector is not applied, no need to query its settings :return: True is cagefs is installed and enabled, False otherwise """ cagefs_status = ''.join(exec_command.exec_command("/usr/sbin/cagefsctl --cagefs-status")).strip() return cagefs_status == 'Enabled' def get_selector_versions(): """ Retrieve versions from selector, EXCEPT native :return: selector versions iterator """ # query selector versions selector_data = ''.join(exec_command.exec_command("/usr/bin/selectorctl --summary --json")) try: selector_versions = json.loads(selector_data).get("PHPConfiguration") except ValueError: # no json could be decoded (may be selector is not installed) selector_versions = () for item in selector_versions: ver = item['version'] if ver != 'native': yield ver def get_selector_users(version): """ Retrieve selector users for given version :param version: selector version, e.g 7.2 :return: dict user: version, e.g. {user: alt-php72, user1: alt-php72} """ users = ''.join(exec_command.exec_command("/usr/bin/selectorctl --list-users --version={v}".format(v=version))) return dict((u, pretty_version_keys(version, 'alt-php')) for u in users.split(',') if u) def get_selector_usage(): """ Retrieve versions set for users in selector :return: dict { user: version } """ if not cagefs_enabled(): return dict() # retrieve users of each selector version users_ver = dict() for ver in get_selector_versions(): users_ver.update(get_selector_users(ver)) return users_ver def ea4_selector_check(ver_domains_list, domain_user_dict, handlers_mapping): """ Check EA4 selector settings. Statistics per handler is inspected. Selector on EA4 does not change handler, it changes version only. Selector version is applied only in case of all the following circumstances: - suexec_module is enabled - system default version is not alt-php - selector setting is not native :param ver_domains_list: set of domains per version :param domain_user_dict: domain--user correspondence :param handlers_mapping: per handler statistics :return: set of domains per version checked against selector settings """ # ## 1.check if selector is appicable default_ver = cpanel_whmapi('php_get_system_default_version').get('version') if ver_domains_list.get(default_ver) is None: # there are no domains using default version - this means there are no domains to apply selector settings to return handlers_mapping # system default version is not alt-php and suexec is installed if 'alt-php' not in default_ver and 'suexec_module' in dump_loaded_modules(): # ## 1 finish # ## 2. get api_ver: dom_list # this is ver_domains_list input, which should be inverted user_domains = ea4_revert_domains_user_struct(domain_user_dict) # ## 2 finish # ## 3. get sel_ver: dom_list s_ver_domains = get_selector_domains(user_domains) # ## 3 finish # ## 4. aggregation: need to inspect system default domains and selector domains against each other api_ver_list = deepcopy(ver_domains_list) # dicts and sets are mutable, so it's better not to modify original structure default_domains_matched, s_ver_matched = matcher(api_ver_list.get(default_ver, set()), s_ver_domains) api_ver_list[default_ver] = default_domains_matched # ## 4 finish # ## 5. merge; need to update stat per handler according to matched structures: selector do not change handler! handlers = deepcopy(handlers_mapping) for h, v in handlers.items(): if default_ver in v: # default version belongs to handler, update whole structure for handler according to matched data handlers[h].update({default_ver: default_domains_matched}) handlers[h].update(merger(v, s_ver_matched)) return handlers # ## 5 finish else: return handlers_mapping def ea4_revert_domains_user_struct(domains_user_dict): """ Revert structure `domain`: `user` to structure `user`: set of domains :param domains_user_dict: dict of domains per user correspondence :return: dict user: set_of_domains """ user_domains = dict() for domain, user in domains_user_dict.items(): user_domains.setdefault(user, set()).add(domain) return user_domains def get_selector_domains(domains_of_user): """ Retrieve domains set for selector versions :param domains_of_user: `user`: set of domains structure :return: dict selector_ver: set of domains """ selector_users = get_selector_usage() # user: version selector_versions = dict() for user, s_ver in selector_users.items(): selector_versions.setdefault(s_ver, set()).update(domains_of_user.get(user, set())) return selector_versions def matcher(system_default_api_domains, selector_versions): """ Match domains sets between system default version and selector versions: - for system default version domains get difference with selector domains - for selector domains get intersection with system default version Common for cPanel and Plesk :param system_default_api_domains: set of domains using system default version :param selector_versions: dict selector_ver: set of domains :return: tuple(clean_default_domains, clean_selector_domains) """ for ver, dom_list in selector_versions.items(): if ver == 'native': continue selector_versions[ver].intersection_update(system_default_api_domains) system_default_api_domains.difference_update(dom_list) return system_default_api_domains, selector_versions def merger(clean_api_versions, clean_selector_versions): """ Merge matched structures of domains (common for cPanel and Plesk) :param clean_api_versions: API_version: set of domains structure after match :param clean_selector_versions: selector_version: set of domains structure after match :return: unified structure version: domains_set """ # get the union of two dicts -- there might be different keys in either merged = dict(clean_api_versions, **clean_selector_versions) # union values (sets of domains) for each version for ver, dom_list in merged.items(): try: merged[ver] = dom_list.union(clean_api_versions[ver]) except KeyError: pass return merged def plesk_selector_check(ver_domains_list, custom_ver): """ Check Plesk selector settings. Selector version is applied only to LSPHP by vendor OS handlers, e.g. x-httpd-lsphp-custom :param ver_domains_list: list of domains per version, including `custom` handler version :param custom_ver: real version, to which Plesk resolves `custom` handler :return: set of domains per version checked against selector settings """ # ## 1.check if selector is appicable if ver_domains_list.get('custom'): # ## 2. get api_ver: dom_list user_domains = plesk_get_user_domains() # ## 2 finish # ## 3. get sel_ver: dom_list s_ver_domains = get_selector_domains(user_domains) # ## 3 finish # ## 4. aggregation: need to inspect custom domains and selector domains against each other api_ver_list = deepcopy(ver_domains_list) # dicts and sets are mutable, so it's better not to modify original structure custom_domains_matched, s_ver_matched = matcher(api_ver_list.get('custom', set()), s_ver_domains) api_ver_list['custom'] = custom_domains_matched # ## 4 finish # ## 5. merge; need to get one whole structure from two different, previously inspected merged = merger(api_ver_list, s_ver_matched) # ## 5 finish else: # there are no domains using custom handler - this means there are no domains to apply selector settings to merged = ver_domains_list # clean `custom` field return plesk_manage_custom(merged, custom_ver) def plesk_get_user_domains(): """ Find `user`: `domains` correspondence for Plesk :return: dict(`user`: set of domains) """ return merger(plesk_get_domains_under_new_subsciption(), plesk_get_domains_under_existing_subsciption()) def plesk_get_domains_under_existing_subsciption(): """ Get users' domains, added under existing subscription :return: dict(user: set_of_domains) """ return plesk_query_db('select login,name from domains inner join Subscriptions as sc on domains.webspace_id=sc.object_id inner join sys_users as u on sc.id=u.id where domains.webspace_id!=0 and sc.object_type="domain"') def plesk_get_domains_under_new_subsciption(): """ Get users' domains, added under new subscription :return: dict(user: set_of_domains) """ return plesk_query_db('select login,name from domains inner join Subscriptions as sc on domains.id=sc.object_id inner join sys_users as u on sc.id=u.id where domains.webspace_id=0 and sc.object_type="domain"') def plesk_query_db(query): """ Query Plesk database for user:domain correspondence and get result in XML :return: dict(user: set_of_domains) """ from lxml import etree user_domains = dict() xml_result = exec_command.exec_command("plesk db '{q}' --xml".format(q=query)) try: root = etree.fromstring(''.join(xml_result)) for row in root.iter('row'): domain_name = row.xpath("field[@name='name']")[0].text user_name = row.xpath("field[@name='login']")[0].text user_domains.setdefault(user_name, set()).add(domain_name) except (etree.XMLSyntaxError, etree.ParseError): raise SelectorStatException("Failed to parse XML from plesk db output: {0}".format(xml_result)) return user_domains def plesk_manage_custom(versions_mapping, real_version): """ Unify `custom` domains with `real_version` domains, clear `custom` key from resulting `version`: `set of domains` mapping :param versions_mapping: final merged structure of domains :param real_version: version, to which Plesk resolves custom :return: final mapping with `custom` domains unified with `real_version` domains """ if versions_mapping: real_version_domains = versions_mapping.get(real_version, set()) real_version_domains.update(versions_mapping.get('custom', set())) versions_mapping.update({real_version: real_version_domains}) try: del versions_mapping['custom'] except KeyError: pass return versions_mapping def da_selector_check(primary_ver_domains, domains_per_user, primary_php): """ Check DA selector settings :param primary_ver_domains: set of domains using primary version, e.g. php1_release :param domains_per_user: user--domain correspondence :param primary_php: php version set in options.conf as php1_ver, only this version is affected by selector :return: set of domains per primary version checked against selector settings """ # ## 1.check if selector is appicable if primary_ver_domains: # ## 2. get api_ver: dom_list # already have it as domains_per_user # ## 2 finish # ## 3. get sel_ver: dom_list s_ver_domains = get_selector_domains(domains_per_user) # ## 3 finish # ## 4. aggregation: need to inspect primary php domains and selector domains against each other api_ver_list = deepcopy(primary_ver_domains) # dicts and sets are mutable, so it's better not to modify original structure default_domains_matched, s_ver_matched = matcher(api_ver_list, s_ver_domains) # ## 4 finish # ## 5. merge; need to get one whole structure from two different, previously inspected merged = merger({primary_php: default_domains_matched}, s_ver_matched) # ## 5 finish return merged else: return {primary_php: primary_ver_domains}