Merge pull request 'perf: parallelize token source price requests' (#20) from feat/parallel-price-lookup-v2 into feat/token-message-rich
Reviewed-on: #20
This commit was merged in pull request #20.
This commit is contained in:
@@ -2,6 +2,7 @@ package commands
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"gopkg.in/telebot.v3"
|
"gopkg.in/telebot.v3"
|
||||||
"me.thuanle/bbot/internal/configs/tele"
|
"me.thuanle/bbot/internal/configs/tele"
|
||||||
@@ -64,39 +65,63 @@ func showStickerMode(context telebot.Context, token string) {
|
|||||||
func collectRichTokenData(token string) buildRichTokenMessageArgs {
|
func collectRichTokenData(token string) buildRichTokenMessageArgs {
|
||||||
a := buildRichTokenMessageArgs{Token: token}
|
a := buildRichTokenMessageArgs{Token: token}
|
||||||
|
|
||||||
if alphaToken, ok := data.Market.GetAlphaToken(token); ok {
|
|
||||||
a.HasAlpha = true
|
|
||||||
a.HasAlpha24h = true
|
|
||||||
a.Alpha24h = alphaToken.GetPercentChange24h()
|
|
||||||
if alphaPrice, ok := data.Market.GetAlphaPrice(token + "USDT"); ok {
|
|
||||||
a.AlphaPrice = alphaPrice
|
|
||||||
} else {
|
|
||||||
a.AlphaPrice = alphaToken.GetPrice()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
futureSymbol := token + "USDT"
|
|
||||||
if data.Market.IsFuturesPair(futureSymbol) {
|
|
||||||
if fp, fr, ft, ok := data.Market.GetFuturePrice(futureSymbol); ok {
|
|
||||||
a.HasFuture = true
|
|
||||||
a.FuturePrice = fp
|
|
||||||
a.FundingRate = fr
|
|
||||||
a.FundingTimeMs = ft
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
spotSymbol := token + "USDT"
|
|
||||||
if data.Market.IsSpotPair(spotSymbol) {
|
|
||||||
if sp, ok := data.Market.GetSpotPrice(spotSymbol); ok {
|
|
||||||
a.HasSpot = true
|
|
||||||
a.SpotPrice = sp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
marginRates := data.Market.GetMarginInterestRates()
|
marginRates := data.Market.GetMarginInterestRates()
|
||||||
a.MarginAPRPercent = marginRates[token] * 365 * 100
|
a.MarginAPRPercent = marginRates[token] * 365 * 100
|
||||||
a.HasMarginAPR = marginRates[token] != 0
|
a.HasMarginAPR = marginRates[token] != 0
|
||||||
|
|
||||||
|
futureSymbol := token + "USDT"
|
||||||
|
spotSymbol := token + "USDT"
|
||||||
|
|
||||||
|
var (
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mu sync.Mutex
|
||||||
|
)
|
||||||
|
wg.Add(3)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if alphaToken, ok := data.Market.GetAlphaToken(token); ok {
|
||||||
|
alpha24h := alphaToken.GetPercentChange24h()
|
||||||
|
alphaPrice := alphaToken.GetPrice()
|
||||||
|
if p, ok := data.Market.GetAlphaPrice(token + "USDT"); ok {
|
||||||
|
alphaPrice = p
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
a.HasAlpha = true
|
||||||
|
a.HasAlpha24h = true
|
||||||
|
a.Alpha24h = alpha24h
|
||||||
|
a.AlphaPrice = alphaPrice
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if data.Market.IsFuturesPair(futureSymbol) {
|
||||||
|
if fp, fr, ft, ok := data.Market.GetFuturePrice(futureSymbol); ok {
|
||||||
|
mu.Lock()
|
||||||
|
a.HasFuture = true
|
||||||
|
a.FuturePrice = fp
|
||||||
|
a.FundingRate = fr
|
||||||
|
a.FundingTimeMs = ft
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if data.Market.IsSpotPair(spotSymbol) {
|
||||||
|
if sp, ok := data.Market.GetSpotPrice(spotSymbol); ok {
|
||||||
|
mu.Lock()
|
||||||
|
a.HasSpot = true
|
||||||
|
a.SpotPrice = sp
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
package commands
|
package commands
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"me.thuanle/bbot/internal/data"
|
"me.thuanle/bbot/internal/data"
|
||||||
"me.thuanle/bbot/internal/data/market"
|
"me.thuanle/bbot/internal/data/market"
|
||||||
@@ -58,9 +60,27 @@ type marketStub struct {
|
|||||||
alphaTokens map[string]market.AlphaTokenInfo
|
alphaTokens map[string]market.AlphaTokenInfo
|
||||||
alphaPrices map[string]float64
|
alphaPrices map[string]float64
|
||||||
marginRates map[string]float64
|
marginRates map[string]float64
|
||||||
|
|
||||||
|
gate *sync.WaitGroup
|
||||||
|
ready chan struct{}
|
||||||
|
done chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
starts int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *marketStub) waitConcurrentGate() {
|
||||||
|
if m.gate == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.mu.Lock()
|
||||||
|
m.starts++
|
||||||
|
m.mu.Unlock()
|
||||||
|
m.gate.Done()
|
||||||
|
<-m.ready
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
|
func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
|
||||||
|
m.waitConcurrentGate()
|
||||||
v, ok := m.futurePrices[symbol]
|
v, ok := m.futurePrices[symbol]
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, 0, 0, false
|
return 0, 0, 0, false
|
||||||
@@ -70,6 +90,7 @@ func (m *marketStub) GetFuturePrice(symbol string) (float64, float64, int64, boo
|
|||||||
func (m *marketStub) GetAllPremiumIndex() (map[string]market.PremiumIndex, error) { return nil, nil }
|
func (m *marketStub) GetAllPremiumIndex() (map[string]market.PremiumIndex, error) { return nil, nil }
|
||||||
func (m *marketStub) GetAllFundRate() (map[string]float64, map[string]int64) { return nil, nil }
|
func (m *marketStub) GetAllFundRate() (map[string]float64, map[string]int64) { return nil, nil }
|
||||||
func (m *marketStub) GetSpotPrice(symbol string) (float64, bool) {
|
func (m *marketStub) GetSpotPrice(symbol string) (float64, bool) {
|
||||||
|
m.waitConcurrentGate()
|
||||||
v, ok := m.spotPrices[symbol]
|
v, ok := m.spotPrices[symbol]
|
||||||
return v, ok
|
return v, ok
|
||||||
}
|
}
|
||||||
@@ -83,6 +104,7 @@ func (m *marketStub) GetAlphaToken(symbol string) (market.AlphaTokenInfo, bool)
|
|||||||
return v, ok
|
return v, ok
|
||||||
}
|
}
|
||||||
func (m *marketStub) GetAlphaPrice(symbol string) (float64, bool) {
|
func (m *marketStub) GetAlphaPrice(symbol string) (float64, bool) {
|
||||||
|
m.waitConcurrentGate()
|
||||||
v, ok := m.alphaPrices[symbol]
|
v, ok := m.alphaPrices[symbol]
|
||||||
return v, ok
|
return v, ok
|
||||||
}
|
}
|
||||||
@@ -154,3 +176,55 @@ func TestCollectRichTokenData_AlphaUsesSymbolPrice(t *testing.T) {
|
|||||||
t.Fatalf("expected alpha price from symbol lookup, got %.4f", args.AlphaPrice)
|
t.Fatalf("expected alpha price from symbol lookup, got %.4f", args.AlphaPrice)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCollectRichTokenData_RunsLookupsConcurrently(t *testing.T) {
|
||||||
|
orig := data.Market
|
||||||
|
defer func() { data.Market = orig }()
|
||||||
|
|
||||||
|
gate := &sync.WaitGroup{}
|
||||||
|
gate.Add(3)
|
||||||
|
st := &marketStub{
|
||||||
|
spotPairs: map[string]bool{"ABCUSDT": true},
|
||||||
|
futuresPairs: map[string]bool{"ABCUSDT": true},
|
||||||
|
spotPrices: map[string]float64{"ABCUSDT": 1.23},
|
||||||
|
futurePrices: map[string]struct {
|
||||||
|
price float64
|
||||||
|
rate float64
|
||||||
|
time int64
|
||||||
|
}{"ABCUSDT": {price: 1.24, rate: 0.0001, time: 1740000000000}},
|
||||||
|
alphaTokens: map[string]market.AlphaTokenInfo{
|
||||||
|
"ABC": {Symbol: "ABC", PercentChange24h: "2.5", Price: "1.22"},
|
||||||
|
},
|
||||||
|
alphaPrices: map[string]float64{"ABCUSDT": 1.22},
|
||||||
|
gate: gate,
|
||||||
|
ready: make(chan struct{}),
|
||||||
|
}
|
||||||
|
data.Market = st
|
||||||
|
|
||||||
|
finished := make(chan buildRichTokenMessageArgs, 1)
|
||||||
|
go func() {
|
||||||
|
finished <- collectRichTokenData("ABC")
|
||||||
|
}()
|
||||||
|
|
||||||
|
waitDone := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
gate.Wait()
|
||||||
|
close(waitDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-waitDone:
|
||||||
|
close(st.ready)
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Fatalf("expected price lookups to run concurrently")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case args := <-finished:
|
||||||
|
if !args.HasSpot || !args.HasFuture || !args.HasAlpha {
|
||||||
|
t.Fatalf("expected all lookups present: %+v", args)
|
||||||
|
}
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Fatalf("collectRichTokenData did not finish")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user