diff --git a/internal/configs/key/env.go b/internal/configs/key/env.go index d9bd0be..e8f5457 100644 --- a/internal/configs/key/env.go +++ b/internal/configs/key/env.go @@ -4,4 +4,5 @@ const ( LogEnv = "LOG_ENV" TelegramToken = "TELEGRAM_TOKEN" + AdminChatID = "ADMIN_CHAT_ID" ) diff --git a/internal/data/imarket.go b/internal/data/imarket.go index ba7a6d4..a7d8a21 100644 --- a/internal/data/imarket.go +++ b/internal/data/imarket.go @@ -4,6 +4,7 @@ import "me.thuanle/bbot/internal/data/market" type IMarket interface { GetFuturePrice(symbol string) (float64, float64, int64, bool) + GetAllPremiumIndex() (map[string]market.PremiumIndex, error) GetAllFundRate() (map[string]float64, map[string]int64) GetSpotPrice(symbol string) (float64, bool) @@ -12,4 +13,9 @@ type IMarket interface { // Alpha token methods IsAlphaToken(symbol string) bool GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool) + + // Trading pair methods + IsSpotPair(symbol string) bool + IsFuturesPair(symbol string) bool + RefreshTradingPairCache() error } diff --git a/internal/data/market/future_price.go b/internal/data/market/future_price.go index 872a80e..d414cdf 100644 --- a/internal/data/market/future_price.go +++ b/internal/data/market/future_price.go @@ -1,64 +1,85 @@ package market import ( - "github.com/adshao/go-binance/v2/futures" - "github.com/rs/zerolog/log" + "context" "strconv" + "time" + + "github.com/rs/zerolog/log" ) func (ms *MarketData) GetFuturePrice(symbol string) (float64, float64, int64, bool) { - ms.mu.RLock() - defer ms.mu.RUnlock() - p, ok := ms.futureMarkPrice[symbol] - if !ok { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + premiums, err := ms.futuresClient.NewPremiumIndexService().Symbol(symbol).Do(ctx) + if err != nil { + log.Error().Err(err).Str("symbol", symbol).Msg("Failed to fetch futures premium index") return 0, 0, 0, false } - return p, ms.futureFundingRate[symbol], ms.futureNextFundingTime[symbol], true -} -func (ms *MarketData) StartFutureWsMarkPrice() error { - _, _, err := futures.WsAllMarkPriceServe(ms.futureWsMarkPriceHandler, ms.futureWsErrHandler) + if len(premiums) == 0 { + return 0, 0, 0, false + } + + p := premiums[0] + markPrice, err := strconv.ParseFloat(p.MarkPrice, 64) if err != nil { - return err + return 0, 0, 0, false } - return nil + + fundingRate, err := strconv.ParseFloat(p.LastFundingRate, 64) + if err != nil { + fundingRate = 0 + } + + return markPrice, fundingRate, p.NextFundingTime, true } -func (ms *MarketData) futureWsMarkPriceHandler(event futures.WsAllMarkPriceEvent) { - ms.mu.Lock() - defer ms.mu.Unlock() - for _, priceEvent := range event { - price, err := strconv.ParseFloat(priceEvent.MarkPrice, 64) +type PremiumIndex struct { + MarkPrice float64 + FundingRate float64 + NextFundingTime int64 +} + +func (ms *MarketData) GetAllPremiumIndex() (map[string]PremiumIndex, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + premiums, err := ms.futuresClient.NewPremiumIndexService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch all futures premium index") + return nil, err + } + + result := make(map[string]PremiumIndex, len(premiums)) + for _, p := range premiums { + markPrice, err := strconv.ParseFloat(p.MarkPrice, 64) if err != nil { continue } - - fundingRate, err := strconv.ParseFloat(priceEvent.FundingRate, 64) - if err != nil { - continue + rate, _ := strconv.ParseFloat(p.LastFundingRate, 64) + result[p.Symbol] = PremiumIndex{ + MarkPrice: markPrice, + FundingRate: rate, + NextFundingTime: p.NextFundingTime, } - - ms.futureMarkPrice[priceEvent.Symbol] = price - ms.futureFundingRate[priceEvent.Symbol] = fundingRate - ms.futureNextFundingTime[priceEvent.Symbol] = priceEvent.NextFundingTime } -} -func (ms *MarketData) futureWsErrHandler(err error) { - log.Debug().Err(err).Msg("Ws Error. Restart socket") - _ = ms.StartFutureWsMarkPrice() + return result, nil } func (ms *MarketData) GetAllFundRate() (map[string]float64, map[string]int64) { - ms.mu.RLock() - defer ms.mu.RUnlock() - rates := make(map[string]float64, len(ms.futureFundingRate)) - for k, v := range ms.futureFundingRate { - rates[k] = v + all, err := ms.GetAllPremiumIndex() + if err != nil { + return make(map[string]float64), make(map[string]int64) } - times := make(map[string]int64, len(ms.futureNextFundingTime)) - for k, v := range ms.futureNextFundingTime { - times[k] = v + + rates := make(map[string]float64, len(all)) + times := make(map[string]int64, len(all)) + for sym, p := range all { + rates[sym] = p.FundingRate + times[sym] = p.NextFundingTime } return rates, times } diff --git a/internal/data/market/main.go b/internal/data/market/main.go index 86333cf..df1418d 100644 --- a/internal/data/market/main.go +++ b/internal/data/market/main.go @@ -4,37 +4,42 @@ import ( "sync" "time" + "github.com/adshao/go-binance/v2" + "github.com/adshao/go-binance/v2/futures" "github.com/rs/zerolog/log" ) type MarketData struct { - mu sync.RWMutex - futureMarkPrice map[string]float64 - futureFundingRate map[string]float64 - futureNextFundingTime map[string]int64 - - spotPrice map[string]float64 + // Trading pair caches + spotPairs map[string]bool + futuresPairs map[string]bool + pairCacheMutex sync.RWMutex + lastPairCacheUpdate time.Time // Alpha token cache alphaTokens map[string]AlphaTokenInfo alphaCacheMutex sync.RWMutex lastAlphaCacheUpdate time.Time + + // Binance REST clients + spotClient *binance.Client + futuresClient *futures.Client } func NewMarketData() *MarketData { log.Info().Msg("Start market service") ms := &MarketData{ - futureMarkPrice: make(map[string]float64), - futureFundingRate: make(map[string]float64), - futureNextFundingTime: make(map[string]int64), - - spotPrice: make(map[string]float64), - alphaTokens: make(map[string]AlphaTokenInfo), + spotPairs: make(map[string]bool), + futuresPairs: make(map[string]bool), + alphaTokens: make(map[string]AlphaTokenInfo), + spotClient: binance.NewClient("", ""), + futuresClient: futures.NewClient("", ""), } - _ = ms.StartFutureWsMarkPrice() - _ = ms.StartSpotWsMarkPrice() - // Initialize Alpha token cache and refresh every hour + if err := ms.refreshTradingPairCache(); err != nil { + log.Error().Err(err).Msg("Failed initial trading pair cache load") + } + go ms.pairCacheRefreshLoop() go ms.alphaCacheRefreshLoop() return ms diff --git a/internal/data/market/spot_price.go b/internal/data/market/spot_price.go index 2320d4c..76a7de6 100644 --- a/internal/data/market/spot_price.go +++ b/internal/data/market/spot_price.go @@ -1,21 +1,36 @@ package market import ( + "context" "strconv" + "time" - "github.com/adshao/go-binance/v2" "github.com/rs/zerolog/log" ) func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) { - ms.mu.RLock() - p, ok := ms.spotPrice[symbol] - ms.mu.RUnlock() - if ok { - return p, true + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + prices, err := ms.spotClient.NewListPricesService().Symbol(symbol).Do(ctx) + if err != nil { + log.Error().Err(err).Str("symbol", symbol).Msg("Failed to fetch spot price") + return ms.getAlphaPrice(symbol) } - // If not found, check if it's an Alpha token + if len(prices) == 0 { + return ms.getAlphaPrice(symbol) + } + + price, err := strconv.ParseFloat(prices[0].Price, 64) + if err != nil || price == 0 { + return ms.getAlphaPrice(symbol) + } + + return price, true +} + +func (ms *MarketData) getAlphaPrice(symbol string) (float64, bool) { if ms.IsAlphaToken(symbol) { if alphaToken, exists := ms.GetAlphaToken(symbol); exists { if price := alphaToken.GetPrice(); price > 0 { @@ -23,32 +38,5 @@ func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) { } } } - return 0, false } - -func (ms *MarketData) StartSpotWsMarkPrice() error { - _, _, err := binance.WsAllMarketsStatServe(ms.spotWsAllMarketsStatHandler, ms.spotWsErrHandler) //.WsAllMarkPriceServe(ms.futureWsMarkPriceHandler, ms.futureWsErrHandler) - - if err != nil { - return err - } - return nil -} - -func (ms *MarketData) spotWsAllMarketsStatHandler(event binance.WsAllMarketsStatEvent) { - ms.mu.Lock() - defer ms.mu.Unlock() - for _, priceEvent := range event { - price, err := strconv.ParseFloat(priceEvent.LastPrice, 64) - if err != nil { - continue - } - ms.spotPrice[priceEvent.Symbol] = price - } -} - -func (ms *MarketData) spotWsErrHandler(err error) { - log.Debug().Err(err).Msg("Spot Ws Error. Restart socket") - _ = ms.StartSpotWsMarkPrice() -} diff --git a/internal/data/market/trading_pairs.go b/internal/data/market/trading_pairs.go new file mode 100644 index 0000000..a064a9b --- /dev/null +++ b/internal/data/market/trading_pairs.go @@ -0,0 +1,74 @@ +package market + +import ( + "context" + "time" + + "github.com/rs/zerolog/log" +) + +func (ms *MarketData) refreshTradingPairCache() error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + spotInfo, err := ms.spotClient.NewExchangeInfoService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch spot exchange info") + return err + } + + futuresInfo, err := ms.futuresClient.NewExchangeInfoService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch futures exchange info") + return err + } + + ms.pairCacheMutex.Lock() + defer ms.pairCacheMutex.Unlock() + + ms.spotPairs = make(map[string]bool, len(spotInfo.Symbols)) + for _, s := range spotInfo.Symbols { + if s.Status == "TRADING" { + ms.spotPairs[s.Symbol] = true + } + } + + ms.futuresPairs = make(map[string]bool, len(futuresInfo.Symbols)) + for _, s := range futuresInfo.Symbols { + if s.Status == "TRADING" { + ms.futuresPairs[s.Symbol] = true + } + } + + ms.lastPairCacheUpdate = time.Now() + log.Info(). + Int("spot", len(ms.spotPairs)). + Int("futures", len(ms.futuresPairs)). + Msg("Trading pair cache refreshed") + return nil +} + +func (ms *MarketData) pairCacheRefreshLoop() { + ms.refreshTradingPairCache() + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + for range ticker.C { + ms.refreshTradingPairCache() + } +} + +func (ms *MarketData) IsSpotPair(symbol string) bool { + ms.pairCacheMutex.RLock() + defer ms.pairCacheMutex.RUnlock() + return ms.spotPairs[symbol] +} + +func (ms *MarketData) IsFuturesPair(symbol string) bool { + ms.pairCacheMutex.RLock() + defer ms.pairCacheMutex.RUnlock() + return ms.futuresPairs[symbol] +} + +func (ms *MarketData) RefreshTradingPairCache() error { + return ms.refreshTradingPairCache() +} diff --git a/internal/helper/binancex/symbol.go b/internal/helper/binancex/symbol.go index b4a45cc..02287a4 100644 --- a/internal/helper/binancex/symbol.go +++ b/internal/helper/binancex/symbol.go @@ -29,8 +29,7 @@ var ( ) func testSym(sym string) bool { - _, _, _, test := data.Market.GetFuturePrice(sym) - return test + return data.Market.IsFuturesPair(sym) } func Token2Symbols(token string) []string { diff --git a/internal/services/controllers/market.go b/internal/services/controllers/market.go index bf30653..0f1adbe 100644 --- a/internal/services/controllers/market.go +++ b/internal/services/controllers/market.go @@ -13,15 +13,19 @@ func GetTopPrices() ([]string, []float64, []float64) { topPrice := make([]float64, n) topRate := make([]float64, n) + all, err := data.Market.GetAllPremiumIndex() + if err != nil { + return topSym, topPrice, topRate + } + for i, sym := range strategy.TopPriceSymbols { - price, rate, _, ok := data.Market.GetFuturePrice(sym) + p, ok := all[sym] if !ok { continue } topSym[i] = sym - topPrice[i] = price - topRate[i] = rate - + topPrice[i] = p.MarkPrice + topRate[i] = p.FundingRate } return topSym, topPrice, topRate } diff --git a/internal/services/tele/command.go b/internal/services/tele/command.go index 86c1277..2549e0d 100644 --- a/internal/services/tele/command.go +++ b/internal/services/tele/command.go @@ -16,6 +16,10 @@ var commandList = []telebot.Command{ Text: "fee", Description: "(f) - show top funding fee", }, + { + Text: "refresh", + Description: "Refresh trading pair cache", + }, } func setupCommands(b *telebot.Bot) error { @@ -36,6 +40,7 @@ func setupCommands(b *telebot.Bot) error { //info b.Handle("/p", commands.OnGetTopPrices) b.Handle("/fee", commands.OnGetTopFundingFee) + b.Handle("/refresh", commands.OnRefreshPairCache) //any text b.Handle(telebot.OnText, commands.OnChatHandler) diff --git a/internal/services/tele/commands/market.go b/internal/services/tele/commands/market.go index 49e0001..319a3c8 100644 --- a/internal/services/tele/commands/market.go +++ b/internal/services/tele/commands/market.go @@ -1,7 +1,12 @@ package commands import ( + "os" + "strconv" + "gopkg.in/telebot.v3" + "me.thuanle/bbot/internal/configs/key" + "me.thuanle/bbot/internal/data" "me.thuanle/bbot/internal/services/controllers" "me.thuanle/bbot/internal/services/tele/chat" "me.thuanle/bbot/internal/services/tele/view" @@ -16,3 +21,14 @@ func OnGetTopFundingFee(context telebot.Context) error { fee, float64s, cds := controllers.GetTopFundingFee() return chat.ReplyMessagePre(context, view.RenderOnGetTopFundingFeeMessage(fee, float64s, cds)) } + +func OnRefreshPairCache(context telebot.Context) error { + adminID, err := strconv.ParseInt(os.Getenv(key.AdminChatID), 10, 64) + if err != nil || adminID == 0 || context.Sender().ID != adminID { + return nil + } + if err := data.Market.RefreshTradingPairCache(); err != nil { + return chat.ReplyMessage(context, "Failed to refresh trading pair cache") + } + return chat.ReplyMessage(context, "Trading pair cache refreshed") +}