algorunner
@todo: define all user required objects, and then import to
algorunner/__init__.pyto simplify document generation and developer experience.
AlgoRunner is a simple framework that can be used to build algorithmic trading strategies against cryptocurrency exchanges.
For information on the development, check the repository.
This documentation covers the objects that you'll need to interact with when using AlgoRunner; it doesn't necessarily cover the internals of the system.
Using AlgoRunner
AlgoRunner is simple: Strategy objects are executed against an exchange via an API Adapter.
Writing Strategies
The abstract methods required to be implemented are located in algorunner.BaseStrategy.
Integration with "Hooks"
AlgoRunner has the concept of "hooks": simple events that are dispatched containing performance metrics or status updates. See algorunner.hooks for information information about these.
Exceptions and Error Handling
Check out the algorunner.exceptions module for more information.
Writing Adapters
Adapters must inherit from algorunner.adapters.base.
View Source
""" .. include:: pdoc.md """ from algorunner.strategy.base import BaseStrategy from algorunner.adapters.base import Adapter from algorunner.hooks import ( Hook, InvalidHookHandler, hook, clear_handlers ) from algorunner.monitoring import Timer __docformat__ = "restructuredtext" __all__ = [ 'BaseStrategy', 'Adapter', 'Hook', 'InvalidHookHandler', 'hook', 'clear_handlers', 'Timer', ]
View Source
class BaseStrategy(ABC): """ A `BaseStrategy` is the container for an algorithm, it simply needs to respond to incoming market payloads and be able to generate events for the internal `SyncAgent` Actor which is responsible for synchronising state between the API and the algorithm. (In this context "state" means transactions, balances and positions) """ class _SyncAgent: def __init__(self, queue: Queue, adapter: Adapter, auth: Callable): self.queue = queue self.api = adapter self.state = AccountState() self.authorisation_guard = auth def start(self): # @todo - do we *really* want it as a daemon; I see two arguments here. # tests obviously *must* be run against a daemon. self.thread = Thread(target=self._listen, daemon=True) self.thread.start() logger.debug("initiated sync agent") def stop(self, reason: Optional[str] = None): logger.info(f"sync agent termination requested: '{reason}'") self.queue.put(ShutdownRequest(reason)) self.thread.join() logger.info("sync agent has halted.") def is_running(self) -> bool: return self.thread.is_alive() def _listen(self): logger.info("listening for events and inbound messages") exception_count = 0 # @todo count exceptions over past 5 mins. Probs a job for a contextmanager. while True: message = self.queue.get() message_type = type(message) try: if message_type == ShutdownRequest: logger.warning( f"terminating trader thread ({message.reason})." ) break elif message_type == TransactionRequest: logger.info( "request recieved from strategy to execute a transaction" ) self._transaction_handler(message) continue elif not is_update(message_type): logger.error("recieved message without known handler") continue message.handle(self.state) except Exception as e: logger.error( "sync agent has caught an exception. will try to continue.", { "exc": e, "exc_count": exception_count, }, ) if exception_count > 5: logger.critical( "exception rate has breached threshold, failing.." ) raise StrategyExceptionThresholdBreached( "too many exceptions encountered!" ) logger.warn("syncagent has completed") def _transaction_handler(self, trx: TransactionRequest): trx = self.authorisation_guard(self.state, trx) if not trx.approved: logger.info(f"transaction rejected: {trx.reason}") return t = Timer(Hook.API_EXECUTE_DURATION) with t: try: logger.info( "transaction accepted: passing to API adapter for dispatch" ) self.api.execute(trx) except InvalidOrder: pass def __call__(self, tick: Tick): t = Timer(Hook.PROCESS_DURATION) with t: self.process(tick) def start_sync(self, queue: Queue, adapter: Adapter): self.sync_agent = self._SyncAgent(queue, adapter, self.log) self.sync_queue = queue def _place_order(self, params: dict): try: params = TransactionRequest(**params) self.sync_queue.put(params) hook(Hook.ORDER_REQUEST, params) except TypeError: raise InvalidOrder( "invalid parameters supplied when attempting order" ) def order_sell_limit(self, **kwargs): if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]): raise InvalidOrder( "order_sell_limit requires symbol, price, and quantity" ) self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_SELL}}) def order_sell_market(self, **kwargs): if not all([kwargs["symbol"], kwargs["quantity"]]): raise InvalidOrder("order_sell_market requires symbol and quantity") self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}}) def order_buy_limit(self, **kwargs): if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]): raise InvalidOrder( "order_buy_limit requires symbol, price, and quantity" ) self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_BUY}}) def order_buy_market(self, **kwargs): if not all([kwargs["symbol"], kwargs["quantity"]]): raise InvalidOrder("order_buy_market requires symbol and quantity") self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}}) def shutdown(self): self.sync_agent.stop("shutdown requested") def account_state(self) -> AccountState: return self.sync_agent.account_state def register_hooks(self, hooks: Dict[Hook, Callable]): def wrapper(fn): def _wrapped(*args, **kwargs): fn(*args, **kwargs) return _wrapped for h in hooks: hook_handler(h)(wrapper(hooks[h])) def authorise( self, state: AccountState, trx: TransactionRequest ) -> TransactionRequest: logger.info( "no authorisation guard set: automatically authorising order" ) trx.approved = True return trx @abstractmethod def process(self, tick: Tick): """ """ pass
A BaseStrategy is the container for an algorithm, it simply needs to respond
to incoming market payloads and be able to generate events for the internal
SyncAgent Actor which is responsible for synchronising state between the API
and the algorithm. (In this context "state" means transactions, balances and
positions)
View Source
def start_sync(self, queue: Queue, adapter: Adapter): self.sync_agent = self._SyncAgent(queue, adapter, self.log) self.sync_queue = queue
View Source
def order_sell_limit(self, **kwargs): if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]): raise InvalidOrder( "order_sell_limit requires symbol, price, and quantity" ) self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_SELL}})
View Source
def order_sell_market(self, **kwargs): if not all([kwargs["symbol"], kwargs["quantity"]]): raise InvalidOrder("order_sell_market requires symbol and quantity") self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}})
View Source
def order_buy_limit(self, **kwargs): if not all([kwargs["symbol"], kwargs["price"], kwargs["quantity"]]): raise InvalidOrder( "order_buy_limit requires symbol, price, and quantity" ) self._place_order({**kwargs, **{"order_type": OrderType.LIMIT_BUY}})
View Source
def order_buy_market(self, **kwargs): if not all([kwargs["symbol"], kwargs["quantity"]]): raise InvalidOrder("order_buy_market requires symbol and quantity") self._place_order({**kwargs, **{"order_type": OrderType.MARKET_SELL}})
View Source
def shutdown(self): self.sync_agent.stop("shutdown requested")
View Source
def account_state(self) -> AccountState: return self.sync_agent.account_state
View Source
def register_hooks(self, hooks: Dict[Hook, Callable]): def wrapper(fn): def _wrapped(*args, **kwargs): fn(*args, **kwargs) return _wrapped for h in hooks: hook_handler(h)(wrapper(hooks[h]))
View Source
@abstractmethod def process(self, tick: Tick): """ """ pass
View Source
class Adapter(ABC): """Required interface that an exchange adapter must implement.""" def __init__(self, sync_queue: Queue): self.sync_queue = sync_queue @abstractmethod def connect(self, creds: messages.Credentials): """connect authenticates with the exchange, and also populates the associated `Trader` object with the latest state.""" pass @abstractmethod def monitor_user(self): """@todo""" pass @abstractmethod def run(self, symbol: str, process: Callable): """run executes the underlying strategy, ensuring that any data transformation required is carried out correctly.""" pass @abstractmethod def execute(self, trx: messages.TransactionRequest) -> bool: pass @abstractmethod def disconnect(self): pass
Required interface that an exchange adapter must implement.
View Source
@abstractmethod def connect(self, creds: messages.Credentials): """connect authenticates with the exchange, and also populates the associated `Trader` object with the latest state.""" pass
connect authenticates with the exchange, and also populates
the associated Trader object with the latest state.
View Source
@abstractmethod def monitor_user(self): """@todo""" pass
@todo
View Source
@abstractmethod def run(self, symbol: str, process: Callable): """run executes the underlying strategy, ensuring that any data transformation required is carried out correctly.""" pass
run executes the underlying strategy, ensuring that any data transformation required is carried out correctly.
View Source
@abstractmethod def execute(self, trx: messages.TransactionRequest) -> bool: pass
View Source
@abstractmethod def disconnect(self): pass
View Source
class Hook(Enum): """Hook represents valid hooks for user-defined functions to listen for.""" RUNNER_INITIALISED = 1 RUNNER_STARTING = 2 RUNNER_STOPPING = 3 ORDER_REQUEST = 4 API_EXECUTE_DURATION = 5 PROCESS_DURATION = 6
Hook represents valid hooks for user-defined functions to listen for.
Inherited Members
- enum.Enum
- name
- value
View Source
class InvalidHookHandler(Exception): """Raised when `hook_handler` is unable to register a given hook.""" pass
Raised when hook_handler is unable to register a given hook.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
View Source
def hook(hook: Hook, *args, **kwargs): """`hook(...)` calls any handlers associated with a given Hook.""" callbacks = _registered_hooks.get(hook, []) for cb in callbacks: try: cb(*args, **kwargs) except TypeError: logger.error(f"invalid handler ({cb.__name__}) for hook ({hook})")
hook(...) calls any handlers associated with a given Hook.
View Source
def clear_handlers(hook: Optional[Hook] = None): """`clear_handlers` clears registered handlers; optionally for a specific hook""" if hook and _registered_hooks.get(hook): _registered_hooks[hook] = [] return _registered_hooks.clear()
clear_handlers clears registered handlers; optionally for
a specific hook
View Source
class Timer: """Simple timer based context manager, used for performance monitoring in conjunction with hooks.""" def __init__(self, trigger_hook: Optional[Hook] = None): self.duration = None self.hook = trigger_hook def __enter__(self): self.start = time() def __exit__(self, exc_type, exc_val, traceback): self.duration = time() - self.start if exc_type: logger.error( f"detected exception during monitoring: {exc_type} ({exc_val})" ) if self.hook: hook(self.hook, self.ms()) def ms(self) -> float: return round(self.duration * 1000)
Simple timer based context manager, used for performance monitoring in conjunction with hooks.
View Source
def __init__(self, trigger_hook: Optional[Hook] = None): self.duration = None self.hook = trigger_hook
View Source
def ms(self) -> float: return round(self.duration * 1000)