Source code for honeycomb.integrationmanager.tasks

# -*- coding: utf-8 -*-
"""Honeycomb integration tasks."""

from __future__ import unicode_literals, absolute_import

import os
import json
import logging
import threading
from time import sleep
from datetime import datetime, tzinfo

import click

from honeycomb.defs import ARGS_JSON
from honeycomb.integrationmanager import exceptions
from honeycomb.integrationmanager.defs import (IntegrationTypes, IntegrationAlertStatuses,
                                               SEND_ALERT_DATA_INTERVAL, MAX_SEND_RETRIES)
from honeycomb.integrationmanager.models import IntegrationAlert, ConfiguredIntegration
from honeycomb.integrationmanager.registration import register_integration, get_integration_module

logger = logging.getLogger(__name__)

configured_integrations = list()
polling_integration_alerts = list()


class _UTC(tzinfo):
    def utcoffset(self, dt):
        return datetime.timedelta(0)

    def tzname(self, dt):
        return "UTC"

    def dst(self, dt):
        return datetime.timedelta(0)


[docs]def configure_integration(path): """Configure and enable an integration.""" integration = register_integration(path) integration_args = {} try: with open(os.path.join(path, ARGS_JSON)) as f: integration_args = json.loads(f.read()) except Exception as exc: logger.debug(str(exc), exc_info=True) raise click.ClickException("Cannot load {} integration args, please configure it first." .format(os.path.basename(path))) click.secho("[*] Adding integration {}".format(integration.name)) logger.debug("Adding integration %s", integration.name, extra={"integration": integration.name, "args": integration_args}) configured_integration = ConfiguredIntegration(name=integration.name, integration=integration, path=path) configured_integration.data = integration_args configured_integration.integration.module = get_integration_module(path).IntegrationActionsClass(integration_args) configured_integrations.append(configured_integration)
[docs]def send_alert_to_subscribed_integrations(alert): """Send Alert to relevant integrations.""" valid_configured_integrations = get_valid_configured_integrations(alert) for configured_integration in valid_configured_integrations: threading.Thread(target=create_integration_alert_and_call_send, args=(alert, configured_integration)).start()
[docs]def get_current_datetime_utc(): """Return a datetime object localized to UTC.""" return datetime.utcnow().replace(tzinfo=_UTC())
[docs]def get_valid_configured_integrations(alert): """Return a list of integrations for alert filtered by alert_type. :returns: A list of relevant integrations """ if not configured_integrations: return [] # Collect all integrations that are configured for specific alert_type # or have no specific supported_event_types (i.e., all alert types) valid_configured_integrations = [ _ for _ in configured_integrations if _.integration.integration_type == IntegrationTypes.EVENT_OUTPUT.name and (not _.integration.supported_event_types or alert.alert_type in _.integration.supported_event_types) ] return valid_configured_integrations
[docs]def create_integration_alert_and_call_send(alert, configured_integration): """Create an IntegrationAlert object and send it to Integration.""" integration_alert = IntegrationAlert( alert=alert, configured_integration=configured_integration, status=IntegrationAlertStatuses.PENDING.name, retries=configured_integration.integration.max_send_retries ) send_alert_to_configured_integration(integration_alert)
[docs]def send_alert_to_configured_integration(integration_alert): """Send IntegrationAlert to configured integration.""" try: alert = integration_alert.alert configured_integration = integration_alert.configured_integration integration = configured_integration.integration integration_actions_instance = configured_integration.integration.module alert_fields = dict() if integration.required_fields: if not all([hasattr(alert, _) for _ in integration.required_fields]): logger.debug("Alert does not have all required_fields (%s) for integration %s, skipping", integration.required_fields, integration.name) return exclude_fields = ["alert_type", "service_type"] alert_fields = {} for field in alert.__slots__: if hasattr(alert, field) and field not in exclude_fields: alert_fields[field] = getattr(alert, field) logger.debug("Sending alert %s to %s", alert_fields, integration.name) output_data, output_file_content = integration_actions_instance.send_event(alert_fields) if integration.polling_enabled: integration_alert.status = IntegrationAlertStatuses.POLLING.name polling_integration_alerts.append(integration_alert) else: integration_alert.status = IntegrationAlertStatuses.DONE.name integration_alert.send_time = get_current_datetime_utc() integration_alert.output_data = json.dumps(output_data) # TODO: do something with successfully handled alerts? They are all written to debug log file except exceptions.IntegrationMissingRequiredFieldError as exc: logger.exception("Send response formatting for integration alert %s failed. Missing required fields", integration_alert, exc.message) integration_alert.status = IntegrationAlertStatuses.ERROR_MISSING_SEND_FIELDS.name except exceptions.IntegrationOutputFormatError: logger.exception("Send response formatting for integration alert %s failed", integration_alert) integration_alert.status = IntegrationAlertStatuses.ERROR_SENDING_FORMATTING.name except exceptions.IntegrationSendEventError as exc: integration_send_retries = integration_alert.retries if integration_alert.retries <= MAX_SEND_RETRIES \ else MAX_SEND_RETRIES # making sure we do not exceed celery max retries send_retries_left = integration_send_retries - 1 integration_alert.retries = send_retries_left logger.error("Sending integration alert %s failed. Message: %s. Retries left: %s", integration_alert, exc.message, send_retries_left) if send_retries_left == 0: integration_alert.status = IntegrationAlertStatuses.ERROR_SENDING.name if send_retries_left > 0: sleep(SEND_ALERT_DATA_INTERVAL) send_alert_to_configured_integration(integration_alert)
[docs]def poll_integration_information_for_waiting_integration_alerts(): """poll_integration_information_for_waiting_integration_alerts.""" if not polling_integration_alerts: return logger.debug("Polling information for waiting integration alerts") for integration_alert in polling_integration_alerts: configured_integration = integration_alert.configured_integration integration = configured_integration.integration polling_duration = integration.polling_duration if get_current_datetime_utc() - integration_alert.send_time > polling_duration: logger.debug("Polling duration expired for integration alert %s", integration_alert) integration_alert.status = IntegrationAlertStatuses.ERROR_POLLING.name else: integration_alert.status = IntegrationAlertStatuses.IN_POLLING.name poll_integration_alert_data(integration_alert)
[docs]def poll_integration_alert_data(integration_alert): """Poll for updates on waiting IntegrationAlerts.""" logger.info("Polling information for integration alert %s", integration_alert) try: configured_integration = integration_alert.configured_integration integration_actions_instance = configured_integration.integration.module output_data, output_file_content = integration_actions_instance.poll_for_updates( json.loads(integration_alert.output_data) ) integration_alert.status = IntegrationAlertStatuses.DONE.name integration_alert.output_data = json.dumps(output_data) polling_integration_alerts.remove(integration_alert) except exceptions.IntegrationNoMethodImplementationError: logger.error("No poll_for_updates function found for integration alert %s", integration_alert) integration_alert.status = IntegrationAlertStatuses.ERROR_POLLING.name except exceptions.IntegrationPollEventError: # This does not always indicate an error, this is also raised when need to try again later logger.debug("Polling for integration alert %s failed", integration_alert) except exceptions.IntegrationOutputFormatError: logger.error("Integration alert %s formatting error", integration_alert) integration_alert.status = IntegrationAlertStatuses.ERROR_POLLING_FORMATTING.name except Exception: logger.exception("Error polling integration alert %s", integration_alert) integration_alert.status = IntegrationAlertStatuses.ERROR_POLLING.name