perf: run token price source lookups in parallel
Fetch alpha, futures, and spot sources concurrently in token lookup flow to reduce end-to-end response latency while preserving independent source behavior. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"me.thuanle/bbot/internal/data"
|
||||
"me.thuanle/bbot/internal/data/market"
|
||||
@@ -58,9 +60,27 @@ type marketStub struct {
|
||||
alphaTokens map[string]market.AlphaTokenInfo
|
||||
alphaPrices map[string]float64
|
||||
marginRates map[string]float64
|
||||
|
||||
gate *sync.WaitGroup
|
||||
ready chan struct{}
|
||||
done chan struct{}
|
||||
mu sync.Mutex
|
||||
starts int
|
||||
}
|
||||
|
||||
func (m *marketStub) waitConcurrentGate() {
|
||||
if m.gate == nil {
|
||||
return
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.starts++
|
||||
m.mu.Unlock()
|
||||
m.gate.Done()
|
||||
<-m.ready
|
||||
}
|
||||
|
||||
func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
|
||||
m.waitConcurrentGate()
|
||||
v, ok := m.futurePrices[symbol]
|
||||
if !ok {
|
||||
return 0, 0, 0, false
|
||||
@@ -70,6 +90,7 @@ func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, boo
|
||||
func (m *marketStub) GetAllPremiumIndex() (map[string]market.PremiumIndex, error) { return nil, nil }
|
||||
func (m *marketStub) GetAllFundRate() (map[string]float64, map[string]int64) { return nil, nil }
|
||||
func (m *marketStub) GetSpotPrice(symbol string) (float64, bool) {
|
||||
m.waitConcurrentGate()
|
||||
v, ok := m.spotPrices[symbol]
|
||||
return v, ok
|
||||
}
|
||||
@@ -83,6 +104,7 @@ func (m *marketStub) GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool)
|
||||
return v, ok
|
||||
}
|
||||
func (m *marketStub) GetAlphaPrice(symbol string) (float64, bool) {
|
||||
m.waitConcurrentGate()
|
||||
v, ok := m.alphaPrices[symbol]
|
||||
return v, ok
|
||||
}
|
||||
@@ -154,3 +176,55 @@ func TestCollectRichTokenData_AlphaUsesSymbolPrice(t *testing.T) {
|
||||
t.Fatalf("expected alpha price from symbol lookup, got %.4f", args.AlphaPrice)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectRichTokenData_RunsLookupsConcurrently(t *testing.T) {
|
||||
orig := data.Market
|
||||
defer func() { data.Market = orig }()
|
||||
|
||||
gate := &sync.WaitGroup{}
|
||||
gate.Add(3)
|
||||
st := &marketStub{
|
||||
spotPairs: map[string]bool{"ABCUSDT": true},
|
||||
futuresPairs: map[string]bool{"ABCUSDT": true},
|
||||
spotPrices: map[string]float64{"ABCUSDT": 1.23},
|
||||
futurePrices: map[string]struct {
|
||||
price float64
|
||||
rate float64
|
||||
time int64
|
||||
}{"ABCUSDT": {price: 1.24, rate: 0.0001, time: 1740000000000}},
|
||||
alphaTokens: map[string]market.AlphaTokenInfo{
|
||||
"ABC": {Symbol: "ABC", PercentChange24h: "2.5", Price: "1.22"},
|
||||
},
|
||||
alphaPrices: map[string]float64{"ABCUSDT": 1.22},
|
||||
gate: gate,
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
data.Market = st
|
||||
|
||||
finished := make(chan buildRichTokenMessageArgs, 1)
|
||||
go func() {
|
||||
finished <- collectRichTokenData("ABC")
|
||||
}()
|
||||
|
||||
waitDone := make(chan struct{})
|
||||
go func() {
|
||||
gate.Wait()
|
||||
close(waitDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitDone:
|
||||
close(st.ready)
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("expected price lookups to run concurrently")
|
||||
}
|
||||
|
||||
select {
|
||||
case args := <-finished:
|
||||
if !args.HasSpot || !args.HasFuture || !args.HasAlpha {
|
||||
t.Fatalf("expected all lookups present: %+v", args)
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatalf("collectRichTokenData did not finish")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user