From 9c39423315a6114fda2ef7e64d7dd09cb4ebebb3 Mon Sep 17 00:00:00 2001 From: thuanle Date: Sun, 26 Apr 2026 15:14:09 +0700 Subject: [PATCH] Rewrite price lookup from WebSocket to REST API Replace unreliable WebSocket connections with on-demand REST API calls for spot and futures prices. Add cached trading pair list (refreshed hourly) for symbol validation, and /refresh command for manual updates. Co-Authored-By: Claude Opus 4.7 --- internal/data/imarket.go | 5 ++ internal/data/market/future_price.go | 82 +++++++++++------------ internal/data/market/main.go | 32 ++++----- internal/data/market/spot_price.go | 56 ++++++---------- internal/data/market/trading_pairs.go | 73 ++++++++++++++++++++ internal/helper/binancex/symbol.go | 3 +- internal/services/tele/command.go | 5 ++ internal/services/tele/commands/market.go | 6 ++ 8 files changed, 169 insertions(+), 93 deletions(-) create mode 100644 internal/data/market/trading_pairs.go diff --git a/internal/data/imarket.go b/internal/data/imarket.go index ba7a6d4..9e45a5f 100644 --- a/internal/data/imarket.go +++ b/internal/data/imarket.go @@ -12,4 +12,9 @@ type IMarket interface { // Alpha token methods IsAlphaToken(symbol string) bool GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool) + + // Trading pair methods + IsSpotPair(symbol string) bool + IsFuturesPair(symbol string) bool + RefreshTradingPairCache() } diff --git a/internal/data/market/future_price.go b/internal/data/market/future_price.go index 872a80e..9b6427e 100644 --- a/internal/data/market/future_price.go +++ b/internal/data/market/future_price.go @@ -1,64 +1,62 @@ package market import ( - "github.com/adshao/go-binance/v2/futures" - "github.com/rs/zerolog/log" + "context" "strconv" + "time" + + "github.com/rs/zerolog/log" ) func (ms *MarketData) GetFuturePrice(symbol string) (float64, float64, int64, bool) { - ms.mu.RLock() - defer ms.mu.RUnlock() - p, ok := ms.futureMarkPrice[symbol] - if !ok { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + premiums, err := ms.futuresClient.NewPremiumIndexService().Symbol(symbol).Do(ctx) + if err != nil { + log.Error().Err(err).Str("symbol", symbol).Msg("Failed to fetch futures premium index") return 0, 0, 0, false } - return p, ms.futureFundingRate[symbol], ms.futureNextFundingTime[symbol], true -} -func (ms *MarketData) StartFutureWsMarkPrice() error { - _, _, err := futures.WsAllMarkPriceServe(ms.futureWsMarkPriceHandler, ms.futureWsErrHandler) + if len(premiums) == 0 { + return 0, 0, 0, false + } + + p := premiums[0] + markPrice, err := strconv.ParseFloat(p.MarkPrice, 64) if err != nil { - return err + return 0, 0, 0, false } - return nil -} -func (ms *MarketData) futureWsMarkPriceHandler(event futures.WsAllMarkPriceEvent) { - ms.mu.Lock() - defer ms.mu.Unlock() - for _, priceEvent := range event { - price, err := strconv.ParseFloat(priceEvent.MarkPrice, 64) - if err != nil { - continue - } - - fundingRate, err := strconv.ParseFloat(priceEvent.FundingRate, 64) - if err != nil { - continue - } - - ms.futureMarkPrice[priceEvent.Symbol] = price - ms.futureFundingRate[priceEvent.Symbol] = fundingRate - ms.futureNextFundingTime[priceEvent.Symbol] = priceEvent.NextFundingTime + fundingRate, err := strconv.ParseFloat(p.LastFundingRate, 64) + if err != nil { + fundingRate = 0 } -} -func (ms *MarketData) futureWsErrHandler(err error) { - log.Debug().Err(err).Msg("Ws Error. Restart socket") - _ = ms.StartFutureWsMarkPrice() + return markPrice, fundingRate, p.NextFundingTime, true } func (ms *MarketData) GetAllFundRate() (map[string]float64, map[string]int64) { - ms.mu.RLock() - defer ms.mu.RUnlock() - rates := make(map[string]float64, len(ms.futureFundingRate)) - for k, v := range ms.futureFundingRate { - rates[k] = v + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + premiums, err := ms.futuresClient.NewPremiumIndexService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch all futures premium index") + return make(map[string]float64), make(map[string]int64) } - times := make(map[string]int64, len(ms.futureNextFundingTime)) - for k, v := range ms.futureNextFundingTime { - times[k] = v + + rates := make(map[string]float64, len(premiums)) + times := make(map[string]int64, len(premiums)) + + for _, p := range premiums { + rate, err := strconv.ParseFloat(p.LastFundingRate, 64) + if err != nil { + continue + } + rates[p.Symbol] = rate + times[p.Symbol] = p.NextFundingTime } + return rates, times } diff --git a/internal/data/market/main.go b/internal/data/market/main.go index 86333cf..f6a4eee 100644 --- a/internal/data/market/main.go +++ b/internal/data/market/main.go @@ -4,37 +4,39 @@ import ( "sync" "time" + "github.com/adshao/go-binance/v2" + "github.com/adshao/go-binance/v2/futures" "github.com/rs/zerolog/log" ) type MarketData struct { - mu sync.RWMutex - futureMarkPrice map[string]float64 - futureFundingRate map[string]float64 - futureNextFundingTime map[string]int64 - - spotPrice map[string]float64 + // Trading pair caches + spotPairs map[string]bool + futuresPairs map[string]bool + pairCacheMutex sync.RWMutex + lastPairCacheUpdate time.Time // Alpha token cache alphaTokens map[string]AlphaTokenInfo alphaCacheMutex sync.RWMutex lastAlphaCacheUpdate time.Time + + // Binance REST clients + spotClient *binance.Client + futuresClient *futures.Client } func NewMarketData() *MarketData { log.Info().Msg("Start market service") ms := &MarketData{ - futureMarkPrice: make(map[string]float64), - futureFundingRate: make(map[string]float64), - futureNextFundingTime: make(map[string]int64), - - spotPrice: make(map[string]float64), - alphaTokens: make(map[string]AlphaTokenInfo), + spotPairs: make(map[string]bool), + futuresPairs: make(map[string]bool), + alphaTokens: make(map[string]AlphaTokenInfo), + spotClient: binance.NewClient("", ""), + futuresClient: futures.NewClient("", ""), } - _ = ms.StartFutureWsMarkPrice() - _ = ms.StartSpotWsMarkPrice() - // Initialize Alpha token cache and refresh every hour + go ms.pairCacheRefreshLoop() go ms.alphaCacheRefreshLoop() return ms diff --git a/internal/data/market/spot_price.go b/internal/data/market/spot_price.go index 2320d4c..76a7de6 100644 --- a/internal/data/market/spot_price.go +++ b/internal/data/market/spot_price.go @@ -1,21 +1,36 @@ package market import ( + "context" "strconv" + "time" - "github.com/adshao/go-binance/v2" "github.com/rs/zerolog/log" ) func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) { - ms.mu.RLock() - p, ok := ms.spotPrice[symbol] - ms.mu.RUnlock() - if ok { - return p, true + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + prices, err := ms.spotClient.NewListPricesService().Symbol(symbol).Do(ctx) + if err != nil { + log.Error().Err(err).Str("symbol", symbol).Msg("Failed to fetch spot price") + return ms.getAlphaPrice(symbol) } - // If not found, check if it's an Alpha token + if len(prices) == 0 { + return ms.getAlphaPrice(symbol) + } + + price, err := strconv.ParseFloat(prices[0].Price, 64) + if err != nil || price == 0 { + return ms.getAlphaPrice(symbol) + } + + return price, true +} + +func (ms *MarketData) getAlphaPrice(symbol string) (float64, bool) { if ms.IsAlphaToken(symbol) { if alphaToken, exists := ms.GetAlphaToken(symbol); exists { if price := alphaToken.GetPrice(); price > 0 { @@ -23,32 +38,5 @@ func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) { } } } - return 0, false } - -func (ms *MarketData) StartSpotWsMarkPrice() error { - _, _, err := binance.WsAllMarketsStatServe(ms.spotWsAllMarketsStatHandler, ms.spotWsErrHandler) //.WsAllMarkPriceServe(ms.futureWsMarkPriceHandler, ms.futureWsErrHandler) - - if err != nil { - return err - } - return nil -} - -func (ms *MarketData) spotWsAllMarketsStatHandler(event binance.WsAllMarketsStatEvent) { - ms.mu.Lock() - defer ms.mu.Unlock() - for _, priceEvent := range event { - price, err := strconv.ParseFloat(priceEvent.LastPrice, 64) - if err != nil { - continue - } - ms.spotPrice[priceEvent.Symbol] = price - } -} - -func (ms *MarketData) spotWsErrHandler(err error) { - log.Debug().Err(err).Msg("Spot Ws Error. Restart socket") - _ = ms.StartSpotWsMarkPrice() -} diff --git a/internal/data/market/trading_pairs.go b/internal/data/market/trading_pairs.go new file mode 100644 index 0000000..230b1c5 --- /dev/null +++ b/internal/data/market/trading_pairs.go @@ -0,0 +1,73 @@ +package market + +import ( + "context" + "time" + + "github.com/rs/zerolog/log" +) + +func (ms *MarketData) refreshTradingPairCache() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + spotInfo, err := ms.spotClient.NewExchangeInfoService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch spot exchange info") + return + } + + futuresInfo, err := ms.futuresClient.NewExchangeInfoService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch futures exchange info") + return + } + + ms.pairCacheMutex.Lock() + defer ms.pairCacheMutex.Unlock() + + ms.spotPairs = make(map[string]bool, len(spotInfo.Symbols)) + for _, s := range spotInfo.Symbols { + if s.Status == "TRADING" { + ms.spotPairs[s.Symbol] = true + } + } + + ms.futuresPairs = make(map[string]bool, len(futuresInfo.Symbols)) + for _, s := range futuresInfo.Symbols { + if s.Status == "TRADING" { + ms.futuresPairs[s.Symbol] = true + } + } + + ms.lastPairCacheUpdate = time.Now() + log.Info(). + Int("spot", len(ms.spotPairs)). + Int("futures", len(ms.futuresPairs)). + Msg("Trading pair cache refreshed") +} + +func (ms *MarketData) pairCacheRefreshLoop() { + ms.refreshTradingPairCache() + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + for range ticker.C { + ms.refreshTradingPairCache() + } +} + +func (ms *MarketData) IsSpotPair(symbol string) bool { + ms.pairCacheMutex.RLock() + defer ms.pairCacheMutex.RUnlock() + return ms.spotPairs[symbol] +} + +func (ms *MarketData) IsFuturesPair(symbol string) bool { + ms.pairCacheMutex.RLock() + defer ms.pairCacheMutex.RUnlock() + return ms.futuresPairs[symbol] +} + +func (ms *MarketData) RefreshTradingPairCache() { + ms.refreshTradingPairCache() +} diff --git a/internal/helper/binancex/symbol.go b/internal/helper/binancex/symbol.go index b4a45cc..02287a4 100644 --- a/internal/helper/binancex/symbol.go +++ b/internal/helper/binancex/symbol.go @@ -29,8 +29,7 @@ var ( ) func testSym(sym string) bool { - _, _, _, test := data.Market.GetFuturePrice(sym) - return test + return data.Market.IsFuturesPair(sym) } func Token2Symbols(token string) []string { diff --git a/internal/services/tele/command.go b/internal/services/tele/command.go index 86c1277..2549e0d 100644 --- a/internal/services/tele/command.go +++ b/internal/services/tele/command.go @@ -16,6 +16,10 @@ var commandList = []telebot.Command{ Text: "fee", Description: "(f) - show top funding fee", }, + { + Text: "refresh", + Description: "Refresh trading pair cache", + }, } func setupCommands(b *telebot.Bot) error { @@ -36,6 +40,7 @@ func setupCommands(b *telebot.Bot) error { //info b.Handle("/p", commands.OnGetTopPrices) b.Handle("/fee", commands.OnGetTopFundingFee) + b.Handle("/refresh", commands.OnRefreshPairCache) //any text b.Handle(telebot.OnText, commands.OnChatHandler) diff --git a/internal/services/tele/commands/market.go b/internal/services/tele/commands/market.go index 49e0001..c73c7ab 100644 --- a/internal/services/tele/commands/market.go +++ b/internal/services/tele/commands/market.go @@ -2,6 +2,7 @@ package commands import ( "gopkg.in/telebot.v3" + "me.thuanle/bbot/internal/data" "me.thuanle/bbot/internal/services/controllers" "me.thuanle/bbot/internal/services/tele/chat" "me.thuanle/bbot/internal/services/tele/view" @@ -16,3 +17,8 @@ func OnGetTopFundingFee(context telebot.Context) error { fee, float64s, cds := controllers.GetTopFundingFee() return chat.ReplyMessagePre(context, view.RenderOnGetTopFundingFeeMessage(fee, float64s, cds)) } + +func OnRefreshPairCache(context telebot.Context) error { + data.Market.RefreshTradingPairCache() + return chat.ReplyMessage(context, "Trading pair cache refreshed") +}