Source code for honeycomb.servicemanager.registration

# -*- coding: utf-8 -*-
"""Honeycomb service manager."""

from __future__ import unicode_literals, absolute_import

import os
import sys
import json
import logging
import platform
import importlib

import six

from honeycomb.defs import NAME, LABEL, CONFIG_FILE_NAME, DEPS_DIR
from honeycomb.utils import config_utils
from honeycomb.exceptions import ConfigFileNotFound
from honeycomb.decoymanager.models import AlertType
from honeycomb.servicemanager import defs
from honeycomb.servicemanager.models import ServiceType, OSFamilies
from honeycomb.servicemanager.exceptions import ServiceNotFound, UnsupportedOS

logger = logging.getLogger(__name__)


[docs]def get_service_module(service_path): """Add custom paths to sys and import service module. :param service_path: Path to service folder """ # add custom paths so imports would work paths = [ os.path.dirname(__file__), # this folder, to catch base_service os.path.realpath(os.path.join(service_path, "..")), # service's parent folder for import os.path.realpath(os.path.join(service_path)), # service's folder for local imports os.path.realpath(os.path.join(service_path, DEPS_DIR)), # deps dir ] for path in paths: path = os.path.realpath(path) logger.debug("adding %s to path", path) sys.path.insert(0, path) # get our service class instance service_name = os.path.basename(service_path) module = ".".join([service_name, service_name + "_service"]) logger.debug("importing %s", module) return importlib.import_module(module)
[docs]def register_service(package_folder): """Register a honeycomb service. :param package_folder: Path to folder with service to load :returns: Validated service object :rtype: :func:`honeycomb.utils.defs.ServiceType` """ logger.debug("registering service %s", package_folder) package_folder = os.path.realpath(package_folder) if not os.path.exists(package_folder): raise ServiceNotFound(os.path.basename(package_folder)) json_config_path = os.path.join(package_folder, CONFIG_FILE_NAME) if not os.path.exists(json_config_path): raise ConfigFileNotFound(json_config_path) with open(json_config_path, "r") as f: config_json = json.load(f) # Validate service and alert config config_utils.validate_config(config_json, defs.SERVICE_ALERT_VALIDATE_FIELDS) config_utils.validate_config(config_json.get(defs.SERVICE_CONFIG_SECTION_KEY, {}), defs.SERVICE_CONFIG_VALIDATE_FIELDS) _validate_supported_platform(config_json) _validate_alert_configs(config_json) config_utils.validate_config_parameters(config_json, defs.SERVICE_ALLOWED_PARAMTER_KEYS, defs.SERVICE_ALLOWED_PARAMTER_TYPES) service_type = _create_service_object(config_json) service_type.alert_types = _create_alert_types(config_json, service_type) return service_type
def _validate_supported_platform(config_json): current_platform = platform.system() supported_platform = config_json[defs.SERVICE_CONFIG_SECTION_KEY][defs.SUPPORTED_OS_FAMILIES] if supported_platform == OSFamilies.ALL.name: return current_platform elif supported_platform == OSFamilies.LINUX.name and \ current_platform in [OSFamilies.LINUX.name, OSFamilies.MACOS.name]: return current_platform elif supported_platform == OSFamilies.WINDOWS.name == current_platform: return current_platform raise UnsupportedOS(supported_platform, current_platform) def _validate_alert_configs(config_json): alert_types = config_json[defs.ALERT_CONFIG_SECTION_KEY] for alert_type in alert_types: config_utils.validate_config(alert_type, defs.ALERT_CONFIG_VALIDATE_FIELDS) def _create_service_object(config_json): service_config = config_json[defs.SERVICE_CONFIG_SECTION_KEY] service_type_create_kwargs = { key: value for key, value in six.iteritems(service_config) if key in defs.SERVICE_FIELDS_TO_CREATE_OBJECT } obj = ServiceType(**service_type_create_kwargs) return obj def _create_alert_types(config_json, service_type): alert_types = [] for alert_type in config_json.get(defs.ALERT_CONFIG_SECTION_KEY, []): _alert_type = AlertType(name=alert_type[NAME], label=alert_type[LABEL], service_type=service_type) alert_types.append(_alert_type) return alert_types