3 Commits

Author SHA1 Message Date
thuanle 391169761a Merge pull request 'perf: parallelize token source price requests' (#20) from feat/parallel-price-lookup-v2 into feat/token-message-rich
Reviewed-on: #20
2026-04-26 19:15:53 +07:00
thuanle a54ec0dbb7 Merge branch 'feat/token-message-rich' into feat/parallel-price-lookup-v2 2026-04-26 19:00:45 +07:00
thuanle a36eb7aad0 perf: run token price source lookups in parallel
Fetch alpha, futures, and spot sources concurrently in token lookup flow to reduce end-to-end response latency while preserving independent source behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-26 18:26:33 +07:00
2 changed files with 128 additions and 29 deletions
+54 -29
View File
@@ -2,6 +2,7 @@ package commands
import (
"strings"
"sync"
"gopkg.in/telebot.v3"
"me.thuanle/bbot/internal/configs/tele"
@@ -64,39 +65,63 @@ func showStickerMode(context telebot.Context, token string) {
func collectRichTokenData(token string) buildRichTokenMessageArgs {
a := buildRichTokenMessageArgs{Token: token}
if alphaToken, ok := data.Market.GetAlphaToken(token); ok {
a.HasAlpha = true
a.HasAlpha24h = true
a.Alpha24h = alphaToken.GetPercentChange24h()
if alphaPrice, ok := data.Market.GetAlphaPrice(token + "USDT"); ok {
a.AlphaPrice = alphaPrice
} else {
a.AlphaPrice = alphaToken.GetPrice()
}
}
futureSymbol := token + "USDT"
if data.Market.IsFuturesPair(futureSymbol) {
if fp, fr, ft, ok := data.Market.GetFuturePrice(futureSymbol); ok {
a.HasFuture = true
a.FuturePrice = fp
a.FundingRate = fr
a.FundingTimeMs = ft
}
}
spotSymbol := token + "USDT"
if data.Market.IsSpotPair(spotSymbol) {
if sp, ok := data.Market.GetSpotPrice(spotSymbol); ok {
a.HasSpot = true
a.SpotPrice = sp
}
}
marginRates := data.Market.GetMarginInterestRates()
a.MarginAPRPercent = marginRates[token] * 365 * 100
a.HasMarginAPR = marginRates[token] != 0
futureSymbol := token + "USDT"
spotSymbol := token + "USDT"
var (
wg sync.WaitGroup
mu sync.Mutex
)
wg.Add(3)
go func() {
defer wg.Done()
if alphaToken, ok := data.Market.GetAlphaToken(token); ok {
alpha24h := alphaToken.GetPercentChange24h()
alphaPrice := alphaToken.GetPrice()
if p, ok := data.Market.GetAlphaPrice(token + "USDT"); ok {
alphaPrice = p
}
mu.Lock()
a.HasAlpha = true
a.HasAlpha24h = true
a.Alpha24h = alpha24h
a.AlphaPrice = alphaPrice
mu.Unlock()
}
}()
go func() {
defer wg.Done()
if data.Market.IsFuturesPair(futureSymbol) {
if fp, fr, ft, ok := data.Market.GetFuturePrice(futureSymbol); ok {
mu.Lock()
a.HasFuture = true
a.FuturePrice = fp
a.FundingRate = fr
a.FundingTimeMs = ft
mu.Unlock()
}
}
}()
go func() {
defer wg.Done()
if data.Market.IsSpotPair(spotSymbol) {
if sp, ok := data.Market.GetSpotPrice(spotSymbol); ok {
mu.Lock()
a.HasSpot = true
a.SpotPrice = sp
mu.Unlock()
}
}
}()
wg.Wait()
return a
}
@@ -1,7 +1,9 @@
package commands
import (
"sync"
"testing"
"time"
"me.thuanle/bbot/internal/data"
"me.thuanle/bbot/internal/data/market"
@@ -58,9 +60,27 @@ type marketStub struct {
alphaTokens map[string]market.AlphaTokenInfo
alphaPrices map[string]float64
marginRates map[string]float64
gate *sync.WaitGroup
ready chan struct{}
done chan struct{}
mu sync.Mutex
starts int
}
func (m *marketStub) waitConcurrentGate() {
if m.gate == nil {
return
}
m.mu.Lock()
m.starts++
m.mu.Unlock()
m.gate.Done()
<-m.ready
}
func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
m.waitConcurrentGate()
v, ok := m.futurePrices[symbol]
if !ok {
return 0, 0, 0, false
@@ -70,6 +90,7 @@ func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, boo
func (m *marketStub) GetAllPremiumIndex() (map[string]market.PremiumIndex, error) { return nil, nil }
func (m *marketStub) GetAllFundRate() (map[string]float64, map[string]int64) { return nil, nil }
func (m *marketStub) GetSpotPrice(symbol string) (float64, bool) {
m.waitConcurrentGate()
v, ok := m.spotPrices[symbol]
return v, ok
}
@@ -83,6 +104,7 @@ func (m *marketStub) GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool)
return v, ok
}
func (m *marketStub) GetAlphaPrice(symbol string) (float64, bool) {
m.waitConcurrentGate()
v, ok := m.alphaPrices[symbol]
return v, ok
}
@@ -154,3 +176,55 @@ func TestCollectRichTokenData_AlphaUsesSymbolPrice(t *testing.T) {
t.Fatalf("expected alpha price from symbol lookup, got %.4f", args.AlphaPrice)
}
}
func TestCollectRichTokenData_RunsLookupsConcurrently(t *testing.T) {
orig := data.Market
defer func() { data.Market = orig }()
gate := &sync.WaitGroup{}
gate.Add(3)
st := &marketStub{
spotPairs: map[string]bool{"ABCUSDT": true},
futuresPairs: map[string]bool{"ABCUSDT": true},
spotPrices: map[string]float64{"ABCUSDT": 1.23},
futurePrices: map[string]struct {
price float64
rate float64
time int64
}{"ABCUSDT": {price: 1.24, rate: 0.0001, time: 1740000000000}},
alphaTokens: map[string]market.AlphaTokenInfo{
"ABC": {Symbol: "ABC", PercentChange24h: "2.5", Price: "1.22"},
},
alphaPrices: map[string]float64{"ABCUSDT": 1.22},
gate: gate,
ready: make(chan struct{}),
}
data.Market = st
finished := make(chan buildRichTokenMessageArgs, 1)
go func() {
finished <- collectRichTokenData("ABC")
}()
waitDone := make(chan struct{})
go func() {
gate.Wait()
close(waitDone)
}()
select {
case <-waitDone:
close(st.ready)
case <-time.After(200 * time.Millisecond):
t.Fatalf("expected price lookups to run concurrently")
}
select {
case args := <-finished:
if !args.HasSpot || !args.HasFuture || !args.HasAlpha {
t.Fatalf("expected all lookups present: %+v", args)
}
case <-time.After(200 * time.Millisecond):
t.Fatalf("collectRichTokenData did not finish")
}
}