3 Commits

Author SHA1 Message Date
thuanle 391169761a Merge pull request 'perf: parallelize token source price requests' (#20) from feat/parallel-price-lookup-v2 into feat/token-message-rich
Reviewed-on: #20
2026-04-26 19:15:53 +07:00
thuanle a54ec0dbb7 Merge branch 'feat/token-message-rich' into feat/parallel-price-lookup-v2 2026-04-26 19:00:45 +07:00
thuanle a36eb7aad0 perf: run token price source lookups in parallel
Fetch alpha, futures, and spot sources concurrently in token lookup flow to reduce end-to-end response latency while preserving independent source behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-26 18:26:33 +07:00
4 changed files with 131 additions and 228 deletions
-35
View File
@@ -1,35 +0,0 @@
package binancex
import (
"strings"
"me.thuanle/bbot/internal/data"
)
func Token2FutureSymbols(token string) []string {
return Token2Symbols(token)
}
func Token2SpotSymbols(token string) []string {
futureSymbols := Token2FutureSymbols(token)
spots := make([]string, 0, len(futureSymbols)+1)
seen := make(map[string]struct{}, len(futureSymbols)+1)
for _, futureSymbol := range futureSymbols {
spotSymbol := Future2SpotSymbol(futureSymbol)
if _, ok := seen[spotSymbol]; ok {
continue
}
seen[spotSymbol] = struct{}{}
spots = append(spots, spotSymbol)
}
spotOnly := strings.ToUpper(token) + "USDT"
if data.Market.IsSpotPair(spotOnly) {
if _, ok := seen[spotOnly]; !ok {
spots = append(spots, spotOnly)
}
}
return spots
}
-91
View File
@@ -1,91 +0,0 @@
package binancex
import (
"testing"
"me.thuanle/bbot/internal/data"
"me.thuanle/bbot/internal/data/market"
)
type resolverMarketStub struct {
spotPairs map[string]bool
futuresPairs map[string]bool
}
func (m *resolverMarketStub) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
return 0, 0, 0, false
}
func (m *resolverMarketStub) GetAllPremiumIndex() (map[string]market.PremiumIndex, error) {
return nil, nil
}
func (m *resolverMarketStub) GetAllFundRate() (map[string]float64, map[string]int64) { return nil, nil }
func (m *resolverMarketStub) GetSpotPrice(symbol string) (float64, bool) { return 0, false }
func (m *resolverMarketStub) GetMarginInterestRates() map[string]float64 { return nil }
func (m *resolverMarketStub) IsAlphaToken(symbol string) bool { return false }
func (m *resolverMarketStub) GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool) {
return market.AlphaTokenInfo{}, false
}
func (m *resolverMarketStub) IsSpotPair(symbol string) bool { return m.spotPairs[symbol] }
func (m *resolverMarketStub) IsFuturesPair(symbol string) bool { return m.futuresPairs[symbol] }
func (m *resolverMarketStub) RefreshTradingPairCache() error { return nil }
func TestToken2FutureSymbols_ResolvesPrefixAndSuffix(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
data.Market = &resolverMarketStub{
futuresPairs: map[string]bool{
"1000PEPEUSDT": true,
"1000PEPEUSDC": true,
},
}
syms := Token2FutureSymbols("pepe")
if len(syms) == 0 {
t.Fatalf("expected future symbols for PEPE")
}
foundUSDT := false
foundUSDC := false
for _, sym := range syms {
if sym == "1000PEPEUSDT" {
foundUSDT = true
}
if sym == "1000PEPEUSDC" {
foundUSDC = true
}
}
if !foundUSDT || !foundUSDC {
t.Fatalf("expected both 1000PEPEUSDT and 1000PEPEUSDC, got %+v", syms)
}
}
func TestToken2SpotSymbols_AppliesExplicitRemap(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
data.Market = &resolverMarketStub{
futuresPairs: map[string]bool{"LUNA2USDT": true},
spotPairs: map[string]bool{"LUNAUSDT": true},
}
spots := Token2SpotSymbols("luna2")
if len(spots) != 1 || spots[0] != "LUNAUSDT" {
t.Fatalf("expected [LUNAUSDT], got %+v", spots)
}
}
func TestToken2SpotSymbols_SpotOnlyFallback(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
data.Market = &resolverMarketStub{
spotPairs: map[string]bool{"ABCUSDT": true},
}
spots := Token2SpotSymbols("abc")
if len(spots) != 1 || spots[0] != "ABCUSDT" {
t.Fatalf("expected [ABCUSDT], got %+v", spots)
}
}
+54 -32
View File
@@ -2,11 +2,11 @@ package commands
import (
"strings"
"sync"
"gopkg.in/telebot.v3"
"me.thuanle/bbot/internal/configs/tele"
"me.thuanle/bbot/internal/data"
"me.thuanle/bbot/internal/helper/binancex"
"me.thuanle/bbot/internal/services/tele/chat"
"me.thuanle/bbot/internal/services/tele/view"
)
@@ -65,41 +65,63 @@ func showStickerMode(context telebot.Context, token string) {
func collectRichTokenData(token string) buildRichTokenMessageArgs {
a := buildRichTokenMessageArgs{Token: token}
if alphaToken, ok := data.Market.GetAlphaToken(token); ok {
a.HasAlpha = true
a.HasAlpha24h = true
a.Alpha24h = alphaToken.GetPercentChange24h()
if alphaPrice, ok := data.Market.GetAlphaPrice(token + "USDT"); ok {
a.AlphaPrice = alphaPrice
} else {
a.AlphaPrice = alphaToken.GetPrice()
}
}
futureSymbols := binancex.Token2FutureSymbols(token)
for _, futureSymbol := range futureSymbols {
if fp, fr, ft, ok := data.Market.GetFuturePrice(futureSymbol); ok {
a.HasFuture = true
a.FuturePrice = fp
a.FundingRate = fr
a.FundingTimeMs = ft
break
}
}
spotSymbols := binancex.Token2SpotSymbols(token)
for _, spotSymbol := range spotSymbols {
if sp, ok := data.Market.GetSpotPrice(spotSymbol); ok {
a.HasSpot = true
a.SpotPrice = sp
break
}
}
marginRates := data.Market.GetMarginInterestRates()
a.MarginAPRPercent = marginRates[token] * 365 * 100
a.HasMarginAPR = marginRates[token] != 0
futureSymbol := token + "USDT"
spotSymbol := token + "USDT"
var (
wg sync.WaitGroup
mu sync.Mutex
)
wg.Add(3)
go func() {
defer wg.Done()
if alphaToken, ok := data.Market.GetAlphaToken(token); ok {
alpha24h := alphaToken.GetPercentChange24h()
alphaPrice := alphaToken.GetPrice()
if p, ok := data.Market.GetAlphaPrice(token + "USDT"); ok {
alphaPrice = p
}
mu.Lock()
a.HasAlpha = true
a.HasAlpha24h = true
a.Alpha24h = alpha24h
a.AlphaPrice = alphaPrice
mu.Unlock()
}
}()
go func() {
defer wg.Done()
if data.Market.IsFuturesPair(futureSymbol) {
if fp, fr, ft, ok := data.Market.GetFuturePrice(futureSymbol); ok {
mu.Lock()
a.HasFuture = true
a.FuturePrice = fp
a.FundingRate = fr
a.FundingTimeMs = ft
mu.Unlock()
}
}
}()
go func() {
defer wg.Done()
if data.Market.IsSpotPair(spotSymbol) {
if sp, ok := data.Market.GetSpotPrice(spotSymbol); ok {
mu.Lock()
a.HasSpot = true
a.SpotPrice = sp
mu.Unlock()
}
}
}()
wg.Wait()
return a
}
+77 -70
View File
@@ -1,7 +1,9 @@
package commands
import (
"sync"
"testing"
"time"
"me.thuanle/bbot/internal/data"
"me.thuanle/bbot/internal/data/market"
@@ -58,9 +60,27 @@ type marketStub struct {
alphaTokens map[string]market.AlphaTokenInfo
alphaPrices map[string]float64
marginRates map[string]float64
gate *sync.WaitGroup
ready chan struct{}
done chan struct{}
mu sync.Mutex
starts int
}
func (m *marketStub) waitConcurrentGate() {
if m.gate == nil {
return
}
m.mu.Lock()
m.starts++
m.mu.Unlock()
m.gate.Done()
<-m.ready
}
func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
m.waitConcurrentGate()
v, ok := m.futurePrices[symbol]
if !ok {
return 0, 0, 0, false
@@ -70,6 +90,7 @@ func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, boo
func (m *marketStub) GetAllPremiumIndex() (map[string]market.PremiumIndex, error) { return nil, nil }
func (m *marketStub) GetAllFundRate() (map[string]float64, map[string]int64) { return nil, nil }
func (m *marketStub) GetSpotPrice(symbol string) (float64, bool) {
m.waitConcurrentGate()
v, ok := m.spotPrices[symbol]
return v, ok
}
@@ -83,6 +104,7 @@ func (m *marketStub) GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool)
return v, ok
}
func (m *marketStub) GetAlphaPrice(symbol string) (float64, bool) {
m.waitConcurrentGate()
v, ok := m.alphaPrices[symbol]
return v, ok
}
@@ -113,15 +135,9 @@ func TestCollectRichTokenData_FutureFailureStillKeepsSpot(t *testing.T) {
defer func() { data.Market = orig }()
data.Market = &marketStub{
spotPairs: map[string]bool{
"ETHUSDT": true,
},
futuresPairs: map[string]bool{
"ETHUSDT": true,
},
spotPrices: map[string]float64{
"ETHUSDT": 3245,
},
spotPairs: map[string]bool{"ETHUSDT": true},
futuresPairs: map[string]bool{"ETHUSDT": true},
spotPrices: map[string]float64{"ETHUSDT": 3245},
futurePrices: map[string]struct {
price float64
rate float64
@@ -138,67 +154,6 @@ func TestCollectRichTokenData_FutureFailureStillKeepsSpot(t *testing.T) {
}
}
func TestCollectRichTokenData_UsesSharedResolverMapping(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
data.Market = &marketStub{
spotPairs: map[string]bool{
"LUNAUSDT": true,
},
futuresPairs: map[string]bool{
"LUNA2USDT": true,
},
spotPrices: map[string]float64{
"LUNAUSDT": 0.49,
},
futurePrices: map[string]struct {
price float64
rate float64
time int64
}{
"LUNA2USDT": {price: 0.50, rate: 0.0001, time: 1740000000000},
},
}
args := collectRichTokenData("LUNA2")
if !args.HasFuture || !args.HasSpot {
t.Fatalf("expected both future and mapped spot, got %+v", args)
}
if args.SpotPrice != 0.49 {
t.Fatalf("expected mapped spot price 0.49, got %f", args.SpotPrice)
}
}
func TestCollectRichTokenData_PrefixFutureMapsToSpot(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
data.Market = &marketStub{
spotPairs: map[string]bool{
"PEPEUSDT": true,
},
futuresPairs: map[string]bool{
"1000PEPEUSDT": true,
},
spotPrices: map[string]float64{
"PEPEUSDT": 0.000012,
},
futurePrices: map[string]struct {
price float64
rate float64
time int64
}{
"1000PEPEUSDT": {price: 0.000013, rate: 0.0002, time: 1740000000000},
},
}
args := collectRichTokenData("PEPE")
if !args.HasFuture || !args.HasSpot {
t.Fatalf("expected both future and mapped spot for prefixed contract, got %+v", args)
}
}
func TestCollectRichTokenData_AlphaUsesSymbolPrice(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
@@ -221,3 +176,55 @@ func TestCollectRichTokenData_AlphaUsesSymbolPrice(t *testing.T) {
t.Fatalf("expected alpha price from symbol lookup, got %.4f", args.AlphaPrice)
}
}
func TestCollectRichTokenData_RunsLookupsConcurrently(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
gate := &sync.WaitGroup{}
gate.Add(3)
st := &marketStub{
spotPairs: map[string]bool{"ABCUSDT": true},
futuresPairs: map[string]bool{"ABCUSDT": true},
spotPrices: map[string]float64{"ABCUSDT": 1.23},
futurePrices: map[string]struct {
price float64
rate float64
time int64
}{"ABCUSDT": {price: 1.24, rate: 0.0001, time: 1740000000000}},
alphaTokens: map[string]market.AlphaTokenInfo{
"ABC": {Symbol: "ABC", PercentChange24h: "2.5", Price: "1.22"},
},
alphaPrices: map[string]float64{"ABCUSDT": 1.22},
gate: gate,
ready: make(chan struct{}),
}
data.Market = st
finished := make(chan buildRichTokenMessageArgs, 1)
go func() {
finished <- collectRichTokenData("ABC")
}()
waitDone := make(chan struct{})
go func() {
gate.Wait()
close(waitDone)
}()
select {
case <-waitDone:
close(st.ready)
case <-time.After(200 * time.Millisecond):
t.Fatalf("expected price lookups to run concurrently")
}
select {
case args := <-finished:
if !args.HasSpot || !args.HasFuture || !args.HasAlpha {
t.Fatalf("expected all lookups present: %+v", args)
}
case <-time.After(200 * time.Millisecond):
t.Fatalf("collectRichTokenData did not finish")
}
}