Source code for honeycomb.cli

# -*- coding: utf-8 -*-
"""Honeycomb Command Line Interface."""
from __future__ import absolute_import

import os
import ctypes
import logging.config

import six
import click
from pythonjsonlogger import jsonlogger

from honeycomb import __version__
from honeycomb.defs import DEBUG_LOG_FILE, INTEGRATIONS, SERVICES
from honeycomb.commands import commands_list
from honeycomb.utils.config_utils import process_config

    auto_envvar_prefix="HC",  # all parameters will be taken from HC_PARAMETER first
    help_option_names=["-h", "--help"],

logger = logging.getLogger(__name__), context_settings=CONTEXT_SETTINGS, invoke_without_command=True,
@click.option("--home", "-H", default=click.get_app_dir("honeycomb"),
              help="Honeycomb home path", type=click.Path(), show_default=True)
@click.option("--iamroot", is_flag=True, default=False, help="Force run as root (NOT RECOMMENDED!)")
# TODO: --config help needs rephrasing
@click.option("--config", "-c", type=click.Path(exists=True, dir_okay=False, resolve_path=True),
              help="Path to a honeycomb.yml file that provides instructions")
@click.option("--verbose", "-v", envvar="DEBUG", is_flag=True, default=False, help="Enable verbose logging")
@click.version_option(version=__version__, message="Honeycomb, version %(version)s")
def cli(ctx, home, iamroot, config, verbose):
    """Honeycomb is a honeypot framework."""
    setup_logging(home, verbose)

    logger.debug("Honeycomb v%s", __version__, extra={"version": __version__})
    logger.debug("running command %s (%s)",, ctx.params,
                 extra={"command":, "params": ctx.params})

        is_admin = os.getuid() == 0
    except AttributeError:
        is_admin = ctypes.windll.shell32.IsUserAnAdmin()

    if is_admin:
        if not iamroot:
            raise click.ClickException("Honeycomb should not run as a privileged user, if you are just "
                                       "trying to bind to a low port try running `setcap 'cap_net_bind_service=+ep' "
                                       "$(which honeycomb)` instead. If you insist, use --iamroot")
        logger.warn("running as root!")

    ctx.obj["HOME"] = home

    logger.debug("ctx: {}".format(ctx.obj))

    if config:
        return process_config(ctx, config)

[docs]class MyLogger(logging.Logger): """Custom Logger."""
[docs] def makeRecord(self, name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None): """Override default logger to allow overriding of internal attributes.""" # See below commented section for a simple example of what the docstring refers to if six.PY2: rv = logging.LogRecord(name, level, fn, lno, msg, args, exc_info, func) else: rv = logging.LogRecord(name, level, fn, lno, msg, args, exc_info, func, sinfo) if extra is None: extra = dict() extra.update({"pid": os.getpid(), "uid": os.getuid(), "gid": os.getgid(), "ppid": os.getppid()}) for key in extra: # if (key in ["message", "asctime"]) or (key in rv.__dict__): # raise KeyError("Attempt to overwrite %r in LogRecord" % key) rv.__dict__[key] = extra[key] return rv
[docs]def setup_logging(home, verbose): """Configure logging for honeycomb.""" logging.setLoggerClass(MyLogger) logging.config.dictConfig({ "version": 1, "disable_existing_loggers": False, "formatters": { "console": { "format": "%(levelname)-8s [%(asctime)s %(name)s] %(filename)s:%(lineno)s %(funcName)s: %(message)s", }, "json": { "()": jsonlogger.JsonFormatter, "format": "%(levelname)s %(asctime)s %(name)s %(filename)s %(lineno)s %(funcName)s %(message)s", }, }, "handlers": { "default": { "level": "DEBUG" if verbose else "INFO", "class": "logging.StreamHandler", "formatter": "console", }, "file": { "level": "DEBUG", "class": "logging.handlers.WatchedFileHandler", "filename": os.path.join(home, DEBUG_LOG_FILE), "formatter": "json", }, }, "loggers": { "": { "handlers": ["default", "file"], "level": "DEBUG", "propagate": True, }, } })
def _mkhome(home): def mkdir_if_not_exists(path): try: if not os.path.exists(path): os.makedirs(path) except OSError as exc: logging.exception(exc) raise click.ClickException("Unable to create Honeycomb home dirs") home = os.path.realpath(home) for path in [home, os.path.join(home, SERVICES), os.path.join(home, INTEGRATIONS)]: mkdir_if_not_exists(path)