Skip to content

Commit

Permalink
Merge pull request #10 from globophobe/feature/0.1.3
Browse files Browse the repository at this point in the history
Release v0.1.3
  • Loading branch information
globophobe authored Nov 6, 2022
2 parents 4aa492b + 25a0554 commit 6bdfad7
Show file tree
Hide file tree
Showing 31 changed files with 839 additions and 410 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ This is the basis of a pipeline for live data from cryptocurrency exchanges. It

# How?

Sequences of trades that have equal symbol, timestamp, nanoseconds, and tick rule are aggregated. Aggregating trades in this way can increase information, as they are either orders of size or stop loss cascades.
Sequences of trades that have equal symbol, timestamp, and tick rule are aggregated. Aggregating trades in this way can increase information, as they are either orders of size or stop loss cascades.

As well, the number of messages can be reduced by 30-50%

By filtering aggregated messages, for example only emitting a mesage when an aggregated trade is greater than `min_volume`, the number of messages can be reduced more.
By filtering aggregated messages, for example only emitting a mesage when an aggregated trade is greater than or equal to a `significant_trade_filter`, the number of messages can be reduced more.

Example
-------
Expand Down Expand Up @@ -37,7 +37,7 @@ As it was aggregated from 4 raw trades, the second trade has ticks 4.
]
```

An example filtered message, emitted because the second aggregated trade exceeds `min_volume >= 1000`
An example filtered message, emitted because the second aggregated trade exceeds `significant_trade_filter >= 1000`

Information related to the first trade is aggregated with the second.

Expand All @@ -64,7 +64,7 @@ Information related to the first trade is aggregated with the second.

For 1m, 5m, 15m candles, there is an optional parameter `window_seconds`.

For settings, see [demo.py](https://github.com/globophobe/cryptofeed-werks/blob/main/demo.py)
For settings, see the [examples](https://github.com/globophobe/cryptofeed-werks/blob/main/examples/)

Supported exchanges
-------------------
Expand All @@ -81,16 +81,14 @@ Supported exchanges

:white_check_mark: Coinbase Pro

:white_check_mark: Deribit

:white_check_mark: FTX

:white_check_mark: Upbit

Contributing
------------

Install dependencies with `poetry install`. The demo is built with [invoke tasks](https://github.com/globophobe/cryptofeed-werks/blob/master/tasks.py). For example, `invoke build`
Install dependencies with `poetry install`. The docker example is built with [invoke tasks](https://github.com/globophobe/cryptofeed-werks/blob/master/tasks.py). For example, `invoke build`

Future plans
------------
Expand Down
9 changes: 0 additions & 9 deletions cryptofeed_werks/exchange.py

This file was deleted.

34 changes: 16 additions & 18 deletions cryptofeed_werks/exchanges/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from .binance import BinanceExchange
from .bitfinex import BitfinexExchange
from .bitflyer import BitflyerExchange
from .bitmex import BitmexExchange
from .bybit import BybitExchange
from .coinbase import CoinbaseExchange
from .deribit import DeribitExchange
from .ftx import FTXExchange
from .upbit import UpbitExchange
from .binance import Binance
from .bitfinex import Bitfinex
from .bitflyer import Bitflyer
from .bitmex import Bitmex
from .bybit import Bybit
from .coinbase import Coinbase
from .ftx import FTX
from .upbit import Upbit

__all__ = [
"BinanceExchange",
"BitmexExchange",
"BitfinexExchange",
"BitflyerExchange",
"BybitExchange",
"CoinbaseExchange",
"DeribitExchange",
"FTXExchange",
"UpbitExchange",
"Binance",
"Bitmex",
"Bitfinex",
"Bitflyer",
"Bybit",
"Coinbase",
"FTX",
"Upbit",
]
35 changes: 20 additions & 15 deletions cryptofeed_werks/exchanges/binance.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
from datetime import datetime, timezone
from decimal import Decimal
from typing import Tuple

import pandas as pd
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Binance
from cryptofeed.exchanges import Binance as BaseBinance

from ..exchange import Exchange
from ..feed import Feed


class BinanceExchange(Exchange, Binance):
class Binance(Feed, BaseBinance):
def __init__(self, *args, **kwargs):
"""
Cryptofeed uses aggregate trade streams.
The raw trade stream is absolute trash and is frequently missing trades.
"""
super().__init__(*args, **kwargs)
self.last_id = None

async def _trade(self, msg: dict, timestamp: float):
def parse_datetime(self, value: int, unit: str = "ms") -> datetime:
"""Parse datetime with pandas."""
return pd.Timestamp(value, unit="ms").replace(tzinfo=timezone.utc)

async def _trade(self, msg: dict, timestamp: float) -> Tuple[str, dict, float]:
"""
Cryptofeed uses aggregate trade streams.
The raw trade stream frequently misses trades, and is absolute trash.
{
"e": "aggTrade", // Event type
"E": 123456789, // Event time
Expand All @@ -42,17 +48,16 @@ async def _trade(self, msg: dict, timestamp: float):
volume = price * notional
ticks = msg["l"] - msg["f"] + 1
assert ticks >= 1, "Ticks not greater than or equal to 1"
ts = (self.timestamp_normalize(msg["E"]),)
trade = {
"exchange": self.id,
"uid": int(msg["l"]), # Last trade ID
"symbol": msg["s"], # Do not normalize
"timestamp": ts,
t = {
"exchange": self.id.lower(),
"uid": self.last_id,
"symbol": msg["s"],
"timestamp": self.parse_datetime(msg["T"]),
"price": price,
"volume": volume,
"notional": notional,
"tickRule": -1 if msg["m"] else 1,
"ticks": ticks,
"isSequential": is_sequential,
}
await self.callback(TRADES, trade, ts)
await self.callback(TRADES, t, timestamp)
54 changes: 39 additions & 15 deletions cryptofeed_werks/exchanges/bitfinex.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,54 @@
from datetime import datetime, timezone
from decimal import Decimal
from typing import Tuple

import pandas as pd
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Bitfinex
from cryptofeed.exchanges import Bitfinex as BaseBitfinex
from cryptofeed.exchanges.bitfinex import LOG

from ..exchange import Exchange
from ..feed import Feed


class BitfinexExchange(Exchange, Bitfinex):
def std_symbol_to_exchange_symbol(self, symbol: str) -> str:
return "t" + symbol
class Bitfinex(Feed, BaseBitfinex):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_initialized = False

async def _trades(self, pair: str, msg: dict, timestamp: float):
def parse_datetime(self, value: int, unit: str = "ms") -> datetime:
"""Parse datetime with pandas."""
return pd.Timestamp(value, unit=unit).replace(tzinfo=timezone.utc)

async def _trades(
self, pair: str, msg: list, timestamp: float
) -> Tuple[str, dict, float]:
async def _trade_update(trade: list, timestamp: float):
uid, ts, notional, price = trade
price = Decimal(price)
notional = abs(Decimal(notional))
volume = price * notional
ts = self.timestamp_normalize(ts)
trade = {
"exchange": self.id,
volume = price * abs(notional)
t = {
"exchange": self.id.lower(),
"uid": uid,
"symbol": pair, # Do not normalize
"timestamp": ts,
"symbol": pair,
"timestamp": self.parse_datetime(ts),
"price": price,
"volume": volume,
"notional": notional,
"notional": abs(notional),
"tickRule": -1 if notional < 0 else 1,
}
await self.callback(TRADES, trade, ts)
await self.callback(TRADES, t, timestamp)

# Drop first message.
if self.is_initialized:
if isinstance(msg[1], list):
# Snapshot.
for trade in msg[1]:
await _trade_update(trade, timestamp)
elif msg[1] in ("te", "fte"):
# Update.
await _trade_update(msg[2], timestamp)
elif msg[1] not in ("tu", "ftu", "hb"):
# Ignore trade updates and heartbeats.
LOG.warning("%s %s: Unexpected trade message %s", self.id, pair, msg)
else:
self.is_initialized = True
33 changes: 21 additions & 12 deletions cryptofeed_werks/exchanges/bitflyer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Bitflyer
from typing import Tuple

from ..exchange import Exchange
from cryptofeed.defines import BUY, TRADES
from cryptofeed.exchanges import Bitflyer as BaseBitflyer

from ..feed import Feed

class BitflyerExchange(Exchange, Bitflyer):
async def _trade(self, msg: dict, timestamp: float):

class Bitflyer(Feed, BaseBitflyer):
def std_symbol_to_exchange_symbol(self, symbol: str) -> str:
"""Standard symbol to exchange symbol."""
return symbol.replace("/", "_")

def exchange_symbol_to_std_symbol(self, symbol: str) -> str:
"""Exchange symbol to standard symbol."""
return symbol.replace("_", "/")

async def _trade(self, msg: dict, timestamp: float) -> Tuple[str, dict, float]:
"""
{
"jsonrpc":"2.0",
Expand All @@ -31,15 +41,14 @@ async def _trade(self, msg: dict, timestamp: float):
price = update["price"]
notional = update["size"]
volume = price * notional
ts = self.timestamp_normalize(update["exec_date"])
trade = {
"exchange": self.id,
t = {
"exchange": self.id.lower(),
"uid": update["id"],
"symbol": pair, # Do not normalize
"timestamp": ts,
"symbol": self.exchange_symbol_to_std_symbol(pair),
"timestamp": self.parse_datetime(update["exec_date"]),
"price": price,
"volume": volume,
"notional": notional,
"tickRule": 1 if update["side"] == "BUY" else -1,
"tickRule": 1 if update["side"] == BUY else -1,
}
await self.callback(TRADES, trade, ts)
await self.callback(TRADES, t, timestamp)
24 changes: 12 additions & 12 deletions cryptofeed_werks/exchanges/bitmex.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from decimal import Decimal
from typing import Tuple

from cryptofeed.defines import TRADES
from cryptofeed.exchanges import Bitmex
from cryptofeed.defines import BUY, TRADES
from cryptofeed.exchanges import Bitmex as BaseBitmex

from ..exchange import Exchange
from ..feed import Feed


class BitmexExchange(Exchange, Bitmex):
async def _trade(self, msg: dict, timestamp: float):
class Bitmex(Feed, BaseBitmex):
async def _trade(self, msg: dict, timestamp: float) -> Tuple[str, dict, float]:
"""
trade msg example
{
Expand All @@ -27,15 +28,14 @@ async def _trade(self, msg: dict, timestamp: float):
price = Decimal(data["price"])
volume = Decimal(data["foreignNotional"])
notional = volume / price
ts = self.timestamp_normalize(data["timestamp"])
trade = {
"exchange": self.id,
t = {
"exchange": self.id.lower(),
"uid": data["trdMatchID"],
"symbol": data["symbol"], # Do not normalize
"timestamp": ts,
"symbol": data["symbol"],
"timestamp": self.parse_datetime(data["timestamp"]),
"price": price,
"volume": volume,
"notional": notional,
"tickRule": 1 if data["side"] == "Buy" else -1,
"tickRule": 1 if data["side"].lower() == BUY else -1,
}
await self.callback(TRADES, trade, ts)
await self.callback(TRADES, t, timestamp)
Loading

0 comments on commit 6bdfad7

Please sign in to comment.