From 9c39423315a6114fda2ef7e64d7dd09cb4ebebb3 Mon Sep 17 00:00:00 2001 From: thuanle Date: Sun, 26 Apr 2026 15:14:09 +0700 Subject: [PATCH 1/3] Rewrite price lookup from WebSocket to REST API Replace unreliable WebSocket connections with on-demand REST API calls for spot and futures prices. Add cached trading pair list (refreshed hourly) for symbol validation, and /refresh command for manual updates. Co-Authored-By: Claude Opus 4.7 --- internal/data/imarket.go | 5 ++ internal/data/market/future_price.go | 82 +++++++++++------------ internal/data/market/main.go | 32 ++++----- internal/data/market/spot_price.go | 56 ++++++---------- internal/data/market/trading_pairs.go | 73 ++++++++++++++++++++ internal/helper/binancex/symbol.go | 3 +- internal/services/tele/command.go | 5 ++ internal/services/tele/commands/market.go | 6 ++ 8 files changed, 169 insertions(+), 93 deletions(-) create mode 100644 internal/data/market/trading_pairs.go diff --git a/internal/data/imarket.go b/internal/data/imarket.go index ba7a6d4..9e45a5f 100644 --- a/internal/data/imarket.go +++ b/internal/data/imarket.go @@ -12,4 +12,9 @@ type IMarket interface { // Alpha token methods IsAlphaToken(symbol string) bool GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool) + + // Trading pair methods + IsSpotPair(symbol string) bool + IsFuturesPair(symbol string) bool + RefreshTradingPairCache() } diff --git a/internal/data/market/future_price.go b/internal/data/market/future_price.go index 872a80e..9b6427e 100644 --- a/internal/data/market/future_price.go +++ b/internal/data/market/future_price.go @@ -1,64 +1,62 @@ package market import ( - "github.com/adshao/go-binance/v2/futures" - "github.com/rs/zerolog/log" + "context" "strconv" + "time" + + "github.com/rs/zerolog/log" ) func (ms *MarketData) GetFuturePrice(symbol string) (float64, float64, int64, bool) { - ms.mu.RLock() - defer ms.mu.RUnlock() - p, ok := ms.futureMarkPrice[symbol] - if !ok { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + premiums, err := ms.futuresClient.NewPremiumIndexService().Symbol(symbol).Do(ctx) + if err != nil { + log.Error().Err(err).Str("symbol", symbol).Msg("Failed to fetch futures premium index") return 0, 0, 0, false } - return p, ms.futureFundingRate[symbol], ms.futureNextFundingTime[symbol], true -} -func (ms *MarketData) StartFutureWsMarkPrice() error { - _, _, err := futures.WsAllMarkPriceServe(ms.futureWsMarkPriceHandler, ms.futureWsErrHandler) + if len(premiums) == 0 { + return 0, 0, 0, false + } + + p := premiums[0] + markPrice, err := strconv.ParseFloat(p.MarkPrice, 64) if err != nil { - return err + return 0, 0, 0, false } - return nil -} -func (ms *MarketData) futureWsMarkPriceHandler(event futures.WsAllMarkPriceEvent) { - ms.mu.Lock() - defer ms.mu.Unlock() - for _, priceEvent := range event { - price, err := strconv.ParseFloat(priceEvent.MarkPrice, 64) - if err != nil { - continue - } - - fundingRate, err := strconv.ParseFloat(priceEvent.FundingRate, 64) - if err != nil { - continue - } - - ms.futureMarkPrice[priceEvent.Symbol] = price - ms.futureFundingRate[priceEvent.Symbol] = fundingRate - ms.futureNextFundingTime[priceEvent.Symbol] = priceEvent.NextFundingTime + fundingRate, err := strconv.ParseFloat(p.LastFundingRate, 64) + if err != nil { + fundingRate = 0 } -} -func (ms *MarketData) futureWsErrHandler(err error) { - log.Debug().Err(err).Msg("Ws Error. Restart socket") - _ = ms.StartFutureWsMarkPrice() + return markPrice, fundingRate, p.NextFundingTime, true } func (ms *MarketData) GetAllFundRate() (map[string]float64, map[string]int64) { - ms.mu.RLock() - defer ms.mu.RUnlock() - rates := make(map[string]float64, len(ms.futureFundingRate)) - for k, v := range ms.futureFundingRate { - rates[k] = v + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + premiums, err := ms.futuresClient.NewPremiumIndexService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch all futures premium index") + return make(map[string]float64), make(map[string]int64) } - times := make(map[string]int64, len(ms.futureNextFundingTime)) - for k, v := range ms.futureNextFundingTime { - times[k] = v + + rates := make(map[string]float64, len(premiums)) + times := make(map[string]int64, len(premiums)) + + for _, p := range premiums { + rate, err := strconv.ParseFloat(p.LastFundingRate, 64) + if err != nil { + continue + } + rates[p.Symbol] = rate + times[p.Symbol] = p.NextFundingTime } + return rates, times } diff --git a/internal/data/market/main.go b/internal/data/market/main.go index 86333cf..f6a4eee 100644 --- a/internal/data/market/main.go +++ b/internal/data/market/main.go @@ -4,37 +4,39 @@ import ( "sync" "time" + "github.com/adshao/go-binance/v2" + "github.com/adshao/go-binance/v2/futures" "github.com/rs/zerolog/log" ) type MarketData struct { - mu sync.RWMutex - futureMarkPrice map[string]float64 - futureFundingRate map[string]float64 - futureNextFundingTime map[string]int64 - - spotPrice map[string]float64 + // Trading pair caches + spotPairs map[string]bool + futuresPairs map[string]bool + pairCacheMutex sync.RWMutex + lastPairCacheUpdate time.Time // Alpha token cache alphaTokens map[string]AlphaTokenInfo alphaCacheMutex sync.RWMutex lastAlphaCacheUpdate time.Time + + // Binance REST clients + spotClient *binance.Client + futuresClient *futures.Client } func NewMarketData() *MarketData { log.Info().Msg("Start market service") ms := &MarketData{ - futureMarkPrice: make(map[string]float64), - futureFundingRate: make(map[string]float64), - futureNextFundingTime: make(map[string]int64), - - spotPrice: make(map[string]float64), - alphaTokens: make(map[string]AlphaTokenInfo), + spotPairs: make(map[string]bool), + futuresPairs: make(map[string]bool), + alphaTokens: make(map[string]AlphaTokenInfo), + spotClient: binance.NewClient("", ""), + futuresClient: futures.NewClient("", ""), } - _ = ms.StartFutureWsMarkPrice() - _ = ms.StartSpotWsMarkPrice() - // Initialize Alpha token cache and refresh every hour + go ms.pairCacheRefreshLoop() go ms.alphaCacheRefreshLoop() return ms diff --git a/internal/data/market/spot_price.go b/internal/data/market/spot_price.go index 2320d4c..76a7de6 100644 --- a/internal/data/market/spot_price.go +++ b/internal/data/market/spot_price.go @@ -1,21 +1,36 @@ package market import ( + "context" "strconv" + "time" - "github.com/adshao/go-binance/v2" "github.com/rs/zerolog/log" ) func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) { - ms.mu.RLock() - p, ok := ms.spotPrice[symbol] - ms.mu.RUnlock() - if ok { - return p, true + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + prices, err := ms.spotClient.NewListPricesService().Symbol(symbol).Do(ctx) + if err != nil { + log.Error().Err(err).Str("symbol", symbol).Msg("Failed to fetch spot price") + return ms.getAlphaPrice(symbol) } - // If not found, check if it's an Alpha token + if len(prices) == 0 { + return ms.getAlphaPrice(symbol) + } + + price, err := strconv.ParseFloat(prices[0].Price, 64) + if err != nil || price == 0 { + return ms.getAlphaPrice(symbol) + } + + return price, true +} + +func (ms *MarketData) getAlphaPrice(symbol string) (float64, bool) { if ms.IsAlphaToken(symbol) { if alphaToken, exists := ms.GetAlphaToken(symbol); exists { if price := alphaToken.GetPrice(); price > 0 { @@ -23,32 +38,5 @@ func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) { } } } - return 0, false } - -func (ms *MarketData) StartSpotWsMarkPrice() error { - _, _, err := binance.WsAllMarketsStatServe(ms.spotWsAllMarketsStatHandler, ms.spotWsErrHandler) //.WsAllMarkPriceServe(ms.futureWsMarkPriceHandler, ms.futureWsErrHandler) - - if err != nil { - return err - } - return nil -} - -func (ms *MarketData) spotWsAllMarketsStatHandler(event binance.WsAllMarketsStatEvent) { - ms.mu.Lock() - defer ms.mu.Unlock() - for _, priceEvent := range event { - price, err := strconv.ParseFloat(priceEvent.LastPrice, 64) - if err != nil { - continue - } - ms.spotPrice[priceEvent.Symbol] = price - } -} - -func (ms *MarketData) spotWsErrHandler(err error) { - log.Debug().Err(err).Msg("Spot Ws Error. Restart socket") - _ = ms.StartSpotWsMarkPrice() -} diff --git a/internal/data/market/trading_pairs.go b/internal/data/market/trading_pairs.go new file mode 100644 index 0000000..230b1c5 --- /dev/null +++ b/internal/data/market/trading_pairs.go @@ -0,0 +1,73 @@ +package market + +import ( + "context" + "time" + + "github.com/rs/zerolog/log" +) + +func (ms *MarketData) refreshTradingPairCache() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + spotInfo, err := ms.spotClient.NewExchangeInfoService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch spot exchange info") + return + } + + futuresInfo, err := ms.futuresClient.NewExchangeInfoService().Do(ctx) + if err != nil { + log.Error().Err(err).Msg("Failed to fetch futures exchange info") + return + } + + ms.pairCacheMutex.Lock() + defer ms.pairCacheMutex.Unlock() + + ms.spotPairs = make(map[string]bool, len(spotInfo.Symbols)) + for _, s := range spotInfo.Symbols { + if s.Status == "TRADING" { + ms.spotPairs[s.Symbol] = true + } + } + + ms.futuresPairs = make(map[string]bool, len(futuresInfo.Symbols)) + for _, s := range futuresInfo.Symbols { + if s.Status == "TRADING" { + ms.futuresPairs[s.Symbol] = true + } + } + + ms.lastPairCacheUpdate = time.Now() + log.Info(). + Int("spot", len(ms.spotPairs)). + Int("futures", len(ms.futuresPairs)). + Msg("Trading pair cache refreshed") +} + +func (ms *MarketData) pairCacheRefreshLoop() { + ms.refreshTradingPairCache() + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + for range ticker.C { + ms.refreshTradingPairCache() + } +} + +func (ms *MarketData) IsSpotPair(symbol string) bool { + ms.pairCacheMutex.RLock() + defer ms.pairCacheMutex.RUnlock() + return ms.spotPairs[symbol] +} + +func (ms *MarketData) IsFuturesPair(symbol string) bool { + ms.pairCacheMutex.RLock() + defer ms.pairCacheMutex.RUnlock() + return ms.futuresPairs[symbol] +} + +func (ms *MarketData) RefreshTradingPairCache() { + ms.refreshTradingPairCache() +} diff --git a/internal/helper/binancex/symbol.go b/internal/helper/binancex/symbol.go index b4a45cc..02287a4 100644 --- a/internal/helper/binancex/symbol.go +++ b/internal/helper/binancex/symbol.go @@ -29,8 +29,7 @@ var ( ) func testSym(sym string) bool { - _, _, _, test := data.Market.GetFuturePrice(sym) - return test + return data.Market.IsFuturesPair(sym) } func Token2Symbols(token string) []string { diff --git a/internal/services/tele/command.go b/internal/services/tele/command.go index 86c1277..2549e0d 100644 --- a/internal/services/tele/command.go +++ b/internal/services/tele/command.go @@ -16,6 +16,10 @@ var commandList = []telebot.Command{ Text: "fee", Description: "(f) - show top funding fee", }, + { + Text: "refresh", + Description: "Refresh trading pair cache", + }, } func setupCommands(b *telebot.Bot) error { @@ -36,6 +40,7 @@ func setupCommands(b *telebot.Bot) error { //info b.Handle("/p", commands.OnGetTopPrices) b.Handle("/fee", commands.OnGetTopFundingFee) + b.Handle("/refresh", commands.OnRefreshPairCache) //any text b.Handle(telebot.OnText, commands.OnChatHandler) diff --git a/internal/services/tele/commands/market.go b/internal/services/tele/commands/market.go index 49e0001..c73c7ab 100644 --- a/internal/services/tele/commands/market.go +++ b/internal/services/tele/commands/market.go @@ -2,6 +2,7 @@ package commands import ( "gopkg.in/telebot.v3" + "me.thuanle/bbot/internal/data" "me.thuanle/bbot/internal/services/controllers" "me.thuanle/bbot/internal/services/tele/chat" "me.thuanle/bbot/internal/services/tele/view" @@ -16,3 +17,8 @@ func OnGetTopFundingFee(context telebot.Context) error { fee, float64s, cds := controllers.GetTopFundingFee() return chat.ReplyMessagePre(context, view.RenderOnGetTopFundingFeeMessage(fee, float64s, cds)) } + +func OnRefreshPairCache(context telebot.Context) error { + data.Market.RefreshTradingPairCache() + return chat.ReplyMessage(context, "Trading pair cache refreshed") +} From c7128ff5168de6ac1c0a57c25d61c858ea2997ba Mon Sep 17 00:00:00 2001 From: thuanle Date: Sun, 26 Apr 2026 15:40:00 +0700 Subject: [PATCH 2/3] Address PR #15 review: batch API calls and admin-guard /refresh 1. Add GetAllPremiumIndex() to fetch all futures data in one call, used by GetTopPrices instead of per-symbol sequential calls. 2. Add ADMIN_CHAT_ID env check to /refresh command to restrict access to authorized users only. Co-Authored-By: Claude Opus 4.7 --- internal/configs/key/env.go | 1 + internal/data/imarket.go | 1 + internal/data/market/future_price.go | 39 ++++++++++++++++++----- internal/services/controllers/market.go | 12 ++++--- internal/services/tele/commands/market.go | 8 +++++ 5 files changed, 49 insertions(+), 12 deletions(-) diff --git a/internal/configs/key/env.go b/internal/configs/key/env.go index d9bd0be..e8f5457 100644 --- a/internal/configs/key/env.go +++ b/internal/configs/key/env.go @@ -4,4 +4,5 @@ const ( LogEnv = "LOG_ENV" TelegramToken = "TELEGRAM_TOKEN" + AdminChatID = "ADMIN_CHAT_ID" ) diff --git a/internal/data/imarket.go b/internal/data/imarket.go index 9e45a5f..708a77d 100644 --- a/internal/data/imarket.go +++ b/internal/data/imarket.go @@ -4,6 +4,7 @@ import "me.thuanle/bbot/internal/data/market" type IMarket interface { GetFuturePrice(symbol string) (float64, float64, int64, bool) + GetAllPremiumIndex() (map[string]market.PremiumIndex, error) GetAllFundRate() (map[string]float64, map[string]int64) GetSpotPrice(symbol string) (float64, bool) diff --git a/internal/data/market/future_price.go b/internal/data/market/future_price.go index 9b6427e..d414cdf 100644 --- a/internal/data/market/future_price.go +++ b/internal/data/market/future_price.go @@ -36,27 +36,50 @@ func (ms *MarketData) GetFuturePrice(symbol string) (float64, float64, int64, bo return markPrice, fundingRate, p.NextFundingTime, true } -func (ms *MarketData) GetAllFundRate() (map[string]float64, map[string]int64) { +type PremiumIndex struct { + MarkPrice float64 + FundingRate float64 + NextFundingTime int64 +} + +func (ms *MarketData) GetAllPremiumIndex() (map[string]PremiumIndex, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() premiums, err := ms.futuresClient.NewPremiumIndexService().Do(ctx) if err != nil { log.Error().Err(err).Msg("Failed to fetch all futures premium index") - return make(map[string]float64), make(map[string]int64) + return nil, err } - rates := make(map[string]float64, len(premiums)) - times := make(map[string]int64, len(premiums)) - + result := make(map[string]PremiumIndex, len(premiums)) for _, p := range premiums { - rate, err := strconv.ParseFloat(p.LastFundingRate, 64) + markPrice, err := strconv.ParseFloat(p.MarkPrice, 64) if err != nil { continue } - rates[p.Symbol] = rate - times[p.Symbol] = p.NextFundingTime + rate, _ := strconv.ParseFloat(p.LastFundingRate, 64) + result[p.Symbol] = PremiumIndex{ + MarkPrice: markPrice, + FundingRate: rate, + NextFundingTime: p.NextFundingTime, + } } + return result, nil +} + +func (ms *MarketData) GetAllFundRate() (map[string]float64, map[string]int64) { + all, err := ms.GetAllPremiumIndex() + if err != nil { + return make(map[string]float64), make(map[string]int64) + } + + rates := make(map[string]float64, len(all)) + times := make(map[string]int64, len(all)) + for sym, p := range all { + rates[sym] = p.FundingRate + times[sym] = p.NextFundingTime + } return rates, times } diff --git a/internal/services/controllers/market.go b/internal/services/controllers/market.go index bf30653..0f1adbe 100644 --- a/internal/services/controllers/market.go +++ b/internal/services/controllers/market.go @@ -13,15 +13,19 @@ func GetTopPrices() ([]string, []float64, []float64) { topPrice := make([]float64, n) topRate := make([]float64, n) + all, err := data.Market.GetAllPremiumIndex() + if err != nil { + return topSym, topPrice, topRate + } + for i, sym := range strategy.TopPriceSymbols { - price, rate, _, ok := data.Market.GetFuturePrice(sym) + p, ok := all[sym] if !ok { continue } topSym[i] = sym - topPrice[i] = price - topRate[i] = rate - + topPrice[i] = p.MarkPrice + topRate[i] = p.FundingRate } return topSym, topPrice, topRate } diff --git a/internal/services/tele/commands/market.go b/internal/services/tele/commands/market.go index c73c7ab..9e8e1b9 100644 --- a/internal/services/tele/commands/market.go +++ b/internal/services/tele/commands/market.go @@ -1,7 +1,11 @@ package commands import ( + "os" + "strconv" + "gopkg.in/telebot.v3" + "me.thuanle/bbot/internal/configs/key" "me.thuanle/bbot/internal/data" "me.thuanle/bbot/internal/services/controllers" "me.thuanle/bbot/internal/services/tele/chat" @@ -19,6 +23,10 @@ func OnGetTopFundingFee(context telebot.Context) error { } func OnRefreshPairCache(context telebot.Context) error { + adminID, _ := strconv.ParseInt(os.Getenv(key.AdminChatID), 10, 64) + if adminID != 0 && context.Sender().ID != adminID { + return nil + } data.Market.RefreshTradingPairCache() return chat.ReplyMessage(context, "Trading pair cache refreshed") } From 914beea5ce7e7e5bd7c16a5c79487a981df86fd4 Mon Sep 17 00:00:00 2001 From: thuanle Date: Sun, 26 Apr 2026 16:03:22 +0700 Subject: [PATCH 3/3] Address PR #15 round 2: fail-closed admin guard, sync init, error reporting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. /refresh now fail-closed: rejects all if ADMIN_CHAT_ID unset or invalid 2. Initial pair cache fill is synchronous — bot waits before accepting queries 3. /refresh reports failure when API fetch fails instead of always saying success Co-Authored-By: Claude Opus 4.7 --- internal/data/imarket.go | 2 +- internal/data/market/main.go | 3 +++ internal/data/market/trading_pairs.go | 11 ++++++----- internal/services/tele/commands/market.go | 8 +++++--- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/data/imarket.go b/internal/data/imarket.go index 708a77d..a7d8a21 100644 --- a/internal/data/imarket.go +++ b/internal/data/imarket.go @@ -17,5 +17,5 @@ type IMarket interface { // Trading pair methods IsSpotPair(symbol string) bool IsFuturesPair(symbol string) bool - RefreshTradingPairCache() + RefreshTradingPairCache() error } diff --git a/internal/data/market/main.go b/internal/data/market/main.go index f6a4eee..df1418d 100644 --- a/internal/data/market/main.go +++ b/internal/data/market/main.go @@ -36,6 +36,9 @@ func NewMarketData() *MarketData { futuresClient: futures.NewClient("", ""), } + if err := ms.refreshTradingPairCache(); err != nil { + log.Error().Err(err).Msg("Failed initial trading pair cache load") + } go ms.pairCacheRefreshLoop() go ms.alphaCacheRefreshLoop() diff --git a/internal/data/market/trading_pairs.go b/internal/data/market/trading_pairs.go index 230b1c5..a064a9b 100644 --- a/internal/data/market/trading_pairs.go +++ b/internal/data/market/trading_pairs.go @@ -7,20 +7,20 @@ import ( "github.com/rs/zerolog/log" ) -func (ms *MarketData) refreshTradingPairCache() { +func (ms *MarketData) refreshTradingPairCache() error { ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() spotInfo, err := ms.spotClient.NewExchangeInfoService().Do(ctx) if err != nil { log.Error().Err(err).Msg("Failed to fetch spot exchange info") - return + return err } futuresInfo, err := ms.futuresClient.NewExchangeInfoService().Do(ctx) if err != nil { log.Error().Err(err).Msg("Failed to fetch futures exchange info") - return + return err } ms.pairCacheMutex.Lock() @@ -45,6 +45,7 @@ func (ms *MarketData) refreshTradingPairCache() { Int("spot", len(ms.spotPairs)). Int("futures", len(ms.futuresPairs)). Msg("Trading pair cache refreshed") + return nil } func (ms *MarketData) pairCacheRefreshLoop() { @@ -68,6 +69,6 @@ func (ms *MarketData) IsFuturesPair(symbol string) bool { return ms.futuresPairs[symbol] } -func (ms *MarketData) RefreshTradingPairCache() { - ms.refreshTradingPairCache() +func (ms *MarketData) RefreshTradingPairCache() error { + return ms.refreshTradingPairCache() } diff --git a/internal/services/tele/commands/market.go b/internal/services/tele/commands/market.go index 9e8e1b9..319a3c8 100644 --- a/internal/services/tele/commands/market.go +++ b/internal/services/tele/commands/market.go @@ -23,10 +23,12 @@ func OnGetTopFundingFee(context telebot.Context) error { } func OnRefreshPairCache(context telebot.Context) error { - adminID, _ := strconv.ParseInt(os.Getenv(key.AdminChatID), 10, 64) - if adminID != 0 && context.Sender().ID != adminID { + adminID, err := strconv.ParseInt(os.Getenv(key.AdminChatID), 10, 64) + if err != nil || adminID == 0 || context.Sender().ID != adminID { return nil } - data.Market.RefreshTradingPairCache() + if err := data.Market.RefreshTradingPairCache(); err != nil { + return chat.ReplyMessage(context, "Failed to refresh trading pair cache") + } return chat.ReplyMessage(context, "Trading pair cache refreshed") }