algorunner

@todo: define all user required objects, and then import to algorunner/__init__.py to simplify document generation and developer experience.

AlgoRunner is a simple framework that can be used to build algorithmic trading strategies against cryptocurrency exchanges.

For information on the development, check the repository.

This documentation covers the objects that you'll need to interact with when using AlgoRunner; it doesn't necessarily cover the internals of the system.

Using AlgoRunner

AlgoRunner is simple: Strategy objects are executed against an exchange via an API Adapter.

Writing Strategies

The abstract methods required to be implemented are located in algorunner.BaseStrategy.

Integration with "Hooks"

AlgoRunner has the concept of "hooks": simple events that are dispatched containing performance metrics or status updates. See algorunner.hooks for information information about these.

Exceptions and Error Handling

Check out the algorunner.exceptions module for more information.

Writing Adapters

Adapters must inherit from algorunner.adapters.base.

View Source
"""
    .. include:: pdoc.md
"""

from algorunner.strategy.base import BaseStrategy
from algorunner.adapters.base import Adapter
from algorunner.hooks import (
    Hook, InvalidHookHandler, hook, clear_handlers
)
from algorunner.monitoring import Timer

__docformat__ = "restructuredtext"
__all__ = [
    'BaseStrategy',
    'Adapter',
    'Hook',
    'InvalidHookHandler',
    'hook',
    'clear_handlers',
    'Timer',
]
#   class BaseStrategy(abc.ABC):
View Source
class BaseStrategy(ABC):
    """
    A `BaseStrategy` is the container for an algorithm, it simply needs to respond
    to incoming market payloads and be able to generate events for the internal
    `SyncAgent` Actor which is responsible for synchronising state between the API
    and the algorithm. (In this context "state" means transactions, balances and
    positions)
    """

    class _SyncAgent:
        def __init__(self, queue: Queue, adapter: Adapter, auth: Callable):
            self.queue = queue
            self.api = adapter
            self.state = AccountState()
            self.authorisation_guard = auth

        def start(self):
            # @todo - do we *really* want it as a daemon; I see two arguments here.
            # tests obviously *must* be run against a daemon.
            self.thread = Thread(target=self._listen, daemon=True)
            self.thread.start()
            logger.debug("initiated sync agent")

        def stop(self, reason: Optional[str] = None):
            logger.info(f"sync agent termination requested: '{reason}'")
            self.queue.put(ShutdownRequest(reason))

            self.thread.join()
            logger.info("sync agent has halted.")

        def is_running(self) -> bool:
            return self.thread.is_alive()

        def _listen(self):
            logger.info("listening for events and inbound messages")

            exception_count = 0  # @todo count exceptions over past 5 mins. Probs a job for a contextmanager.
            while True:
                message = self.queue.get()
                message_type = type(message)

                try:
                    if message_type == ShutdownRequest:
                        logger.warning(
                            f"terminating trader thread ({message.reason})."
                        )
                        break
                    elif message_type == TransactionRequest:
                        logger.info(
                            "request recieved from strategy to execute a transaction"
                        )
                        self._transaction_handler(message)
                        continue
                    elif not is_update(message_type):
                        logger.error("recieved message without known handler")
                        continue

                    message.handle(self.state)
                except Exception as e:
                    logger.error(
                        "sync agent has caught an exception. will try to continue.",
                        {
                            "exc": e,
                            "exc_count": exception_count,
                        },
                    )

                    if exception_count > 5:
                        logger.critical(
                            "exception rate has breached threshold, failing.."
                        )
                        raise StrategyExceptionThresholdBreached(
                            "too many exceptions encountered!"
                        )

            logger.warn("syncagent has completed")

        def _transaction_handler(self, trx: TransactionRequest):
            trx = self.authorisation_guard(self.state, trx)
            if not trx.approved:
                logger.info(f"transaction rejected: {trx.reason}")
                return

            t = Timer(Hook.API_EXECUTE_DURATION)
            with t:
                try:
                    logger.info(
                        "transaction accepted: passing to API adapter for dispatch"
                    )
                    self.api.execute(trx)
                except InvalidOrder:
                    pass

    def __call__(self, tick: Tick):
        t = Timer(Hook.PROCESS_DURATION)
        with t:
            self.process(tick)

    def start_sync(self, queue: Queue, adapter: Adapter):
        self.sync_agent = self._SyncAgent(queue, adapter, self.log)
        self.sync_queue = queue

    def _place_order(self, params: dict):
        try:
            params = TransactionRequest(**params)
            self.sync_queue.put(params)
            hook(Hook.ORDER_REQUEST, params)
        except TypeError:
            raise InvalidOrder(
                "invalid parameters supplied when attempting order"
            )

    def order_sell_limit(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]):
            raise InvalidOrder(
                "order_sell_limit requires symbol, price, and quantity"
            )

        self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_SELL}})

    def order_sell_market(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["quantity"]]):
            raise InvalidOrder("order_sell_market requires symbol and quantity")

        self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}})

    def order_buy_limit(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]):
            raise InvalidOrder(
                "order_buy_limit requires symbol, price, and quantity"
            )

        self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_BUY}})

    def order_buy_market(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["quantity"]]):
            raise InvalidOrder("order_buy_market requires symbol and quantity")

        self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}})

    def shutdown(self):
        self.sync_agent.stop("shutdown requested")

    def account_state(self) -> AccountState:
        return self.sync_agent.account_state

    def register_hooks(self, hooks: Dict[Hook, Callable]):
        def wrapper(fn):
            def _wrapped(*args, **kwargs):
                fn(*args, **kwargs)

            return _wrapped

        for h in hooks:
            hook_handler(h)(wrapper(hooks[h]))

    def authorise(
        self, state: AccountState, trx: TransactionRequest
    ) -> TransactionRequest:
        logger.info(
            "no authorisation guard set: automatically authorising order"
        )
        trx.approved = True
        return trx

    @abstractmethod
    def process(self, tick: Tick):
        """ """
        pass

A BaseStrategy is the container for an algorithm, it simply needs to respond to incoming market payloads and be able to generate events for the internal SyncAgent Actor which is responsible for synchronising state between the API and the algorithm. (In this context "state" means transactions, balances and positions)

#   def start_sync(self, queue: queue.Queue, adapter: algorunner.adapters.base.Adapter):
View Source
    def start_sync(self, queue: Queue, adapter: Adapter):
        self.sync_agent = self._SyncAgent(queue, adapter, self.log)
        self.sync_queue = queue
#   def order_sell_limit(self, **kwargs):
View Source
    def order_sell_limit(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]):
            raise InvalidOrder(
                "order_sell_limit requires symbol, price, and quantity"
            )

        self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_SELL}})
#   def order_sell_market(self, **kwargs):
View Source
    def order_sell_market(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["quantity"]]):
            raise InvalidOrder("order_sell_market requires symbol and quantity")

        self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}})
#   def order_buy_limit(self, **kwargs):
View Source
    def order_buy_limit(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]):
            raise InvalidOrder(
                "order_buy_limit requires symbol, price, and quantity"
            )

        self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_BUY}})
#   def order_buy_market(self, **kwargs):
View Source
    def order_buy_market(self, **kwargs):
        if not all([kwargs["symbol"], kwargs["quantity"]]):
            raise InvalidOrder("order_buy_market requires symbol and quantity")

        self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}})
#   def shutdown(self):
View Source
    def shutdown(self):
        self.sync_agent.stop("shutdown requested")
#   def account_state(self) -> algorunner.mutations.AccountState:
View Source
    def account_state(self) -> AccountState:
        return self.sync_agent.account_state
#   def register_hooks(self, hooks: Dict[algorunner.hooks.Hook, Callable]):
View Source
    def register_hooks(self, hooks: Dict[Hook, Callable]):
        def wrapper(fn):
            def _wrapped(*args, **kwargs):
                fn(*args, **kwargs)

            return _wrapped

        for h in hooks:
            hook_handler(h)(wrapper(hooks[h]))
View Source
    def authorise(
        self, state: AccountState, trx: TransactionRequest
    ) -> TransactionRequest:
        logger.info(
            "no authorisation guard set: automatically authorising order"
        )
        trx.approved = True
        return trx
#  
@abstractmethod
def process( self, tick: Union[pandas.core.frame.DataFrame, algorunner.adapters.messages.RawTickPayload] ):
View Source
    @abstractmethod
    def process(self, tick: Tick):
        """ """
        pass
#   class Adapter(abc.ABC):
View Source
class Adapter(ABC):
    """Required interface that an exchange adapter must implement."""

    def __init__(self, sync_queue: Queue):
        self.sync_queue = sync_queue

    @abstractmethod
    def connect(self, creds: messages.Credentials):
        """connect authenticates with the exchange, and also populates
        the associated `Trader` object with the latest state."""
        pass

    @abstractmethod
    def monitor_user(self):
        """@todo"""
        pass

    @abstractmethod
    def run(self, symbol: str, process: Callable):
        """run executes the underlying strategy, ensuring that any data
        transformation required is carried out correctly."""
        pass

    @abstractmethod
    def execute(self, trx: messages.TransactionRequest) -> bool:
        pass

    @abstractmethod
    def disconnect(self):
        pass

Required interface that an exchange adapter must implement.

#  
@abstractmethod
def connect(self, creds: algorunner.adapters.messages.Credentials):
View Source
    @abstractmethod
    def connect(self, creds: messages.Credentials):
        """connect authenticates with the exchange, and also populates
        the associated `Trader` object with the latest state."""
        pass

connect authenticates with the exchange, and also populates the associated Trader object with the latest state.

#  
@abstractmethod
def monitor_user(self):
View Source
    @abstractmethod
    def monitor_user(self):
        """@todo"""
        pass

@todo

#  
@abstractmethod
def run(self, symbol: str, process: Callable):
View Source
    @abstractmethod
    def run(self, symbol: str, process: Callable):
        """run executes the underlying strategy, ensuring that any data
        transformation required is carried out correctly."""
        pass

run executes the underlying strategy, ensuring that any data transformation required is carried out correctly.

#  
@abstractmethod
def execute(self, trx: algorunner.adapters.messages.TransactionRequest) -> bool:
View Source
    @abstractmethod
    def execute(self, trx: messages.TransactionRequest) -> bool:
        pass
#  
@abstractmethod
def disconnect(self):
View Source
    @abstractmethod
    def disconnect(self):
        pass
#   class Hook(enum.Enum):
View Source
class Hook(Enum):
    """Hook represents valid hooks for user-defined functions to listen
    for."""

    RUNNER_INITIALISED = 1
    RUNNER_STARTING = 2
    RUNNER_STOPPING = 3
    ORDER_REQUEST = 4
    API_EXECUTE_DURATION = 5
    PROCESS_DURATION = 6

Hook represents valid hooks for user-defined functions to listen for.

#   RUNNER_INITIALISED = <Hook.RUNNER_INITIALISED: 1>
#   RUNNER_STARTING = <Hook.RUNNER_STARTING: 2>
#   RUNNER_STOPPING = <Hook.RUNNER_STOPPING: 3>
#   ORDER_REQUEST = <Hook.ORDER_REQUEST: 4>
#   API_EXECUTE_DURATION = <Hook.API_EXECUTE_DURATION: 5>
#   PROCESS_DURATION = <Hook.PROCESS_DURATION: 6>
Inherited Members
enum.Enum
name
value
#   class InvalidHookHandler(builtins.Exception):
View Source
class InvalidHookHandler(Exception):
    """Raised when `hook_handler` is unable to register a given hook."""

    pass

Raised when hook_handler is unable to register a given hook.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
#   def hook(hook: algorunner.hooks.Hook, *args, **kwargs):
View Source
def hook(hook: Hook, *args, **kwargs):
    """`hook(...)` calls any handlers associated with a given Hook."""
    callbacks = _registered_hooks.get(hook, [])
    for cb in callbacks:
        try:
            cb(*args, **kwargs)
        except TypeError:
            logger.error(f"invalid handler ({cb.__name__}) for hook ({hook})")

hook(...) calls any handlers associated with a given Hook.

#   def clear_handlers(hook: Optional[algorunner.hooks.Hook] = None):
View Source
def clear_handlers(hook: Optional[Hook] = None):
    """`clear_handlers` clears registered handlers; optionally for
    a specific hook"""
    if hook and _registered_hooks.get(hook):
        _registered_hooks[hook] = []
        return

    _registered_hooks.clear()

clear_handlers clears registered handlers; optionally for a specific hook

#   class Timer:
View Source
class Timer:
    """Simple timer based context manager, used for performance monitoring
    in conjunction with hooks."""

    def __init__(self, trigger_hook: Optional[Hook] = None):
        self.duration = None
        self.hook = trigger_hook

    def __enter__(self):
        self.start = time()

    def __exit__(self, exc_type, exc_val, traceback):
        self.duration = time() - self.start

        if exc_type:
            logger.error(
                f"detected exception during monitoring: {exc_type} ({exc_val})"
            )

        if self.hook:
            hook(self.hook, self.ms())

    def ms(self) -> float:
        return round(self.duration * 1000)

Simple timer based context manager, used for performance monitoring in conjunction with hooks.

#   Timer(trigger_hook: Optional[algorunner.hooks.Hook] = None)
View Source
    def __init__(self, trigger_hook: Optional[Hook] = None):
        self.duration = None
        self.hook = trigger_hook
#   def ms(self) -> float:
View Source
    def ms(self) -> float:
        return round(self.duration * 1000)