Fix concurrent map read/write race condition in MarketData

Add sync.RWMutex to protect future and spot price maps accessed by
WebSocket handlers (writers) and Telegram bot handlers (readers).
Also adds Alpha token support with caching.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-25 00:15:22 +07:00
parent 248d3153e7
commit 440fa3bacd
9 changed files with 297 additions and 31 deletions
+179
View File
@@ -0,0 +1,179 @@
package market
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/rs/zerolog/log"
)
// AlphaTokenInfo represents the response from Binance Alpha token API
type AlphaTokenInfo struct {
TokenID string `json:"tokenId"`
ChainID string `json:"chainId"`
ChainIconURL string `json:"chainIconUrl"`
ChainName string `json:"chainName"`
ContractAddress string `json:"contractAddress"`
Name string `json:"name"`
Symbol string `json:"symbol"`
IconURL string `json:"iconUrl"`
Price string `json:"price"`
PercentChange24h string `json:"percentChange24h"`
Volume24h string `json:"volume24h"`
MarketCap string `json:"marketCap"`
FDV string `json:"fdv"`
Liquidity string `json:"liquidity"`
TotalSupply string `json:"totalSupply"`
CirculatingSupply string `json:"circulatingSupply"`
Holders string `json:"holders"`
Decimals int `json:"decimals"`
ListingCex bool `json:"listingCex"`
HotTag bool `json:"hotTag"`
CexCoinName string `json:"cexCoinName"`
CanTransfer bool `json:"canTransfer"`
Denomination int `json:"denomination"`
Offline bool `json:"offline"`
TradeDecimal int `json:"tradeDecimal"`
AlphaID string `json:"alphaId"`
Offsell bool `json:"offsell"`
PriceHigh24h string `json:"priceHigh24h"`
PriceLow24h string `json:"priceLow24h"`
Count24h string `json:"count24h"`
OnlineTge bool `json:"onlineTge"`
OnlineAirdrop bool `json:"onlineAirdrop"`
Score int `json:"score"`
CexOffDisplay bool `json:"cexOffDisplay"`
StockState bool `json:"stockState"`
ListingTime int64 `json:"listingTime"`
MulPoint int `json:"mulPoint"`
BnExclusiveState bool `json:"bnExclusiveState"`
}
// AlphaTokenResponse represents the API response structure
type AlphaTokenResponse struct {
Code string `json:"code"`
Message *string `json:"message"`
MessageDetail *string `json:"messageDetail"`
Data []AlphaTokenInfo `json:"data"`
}
// GetPrice returns the price as float64
func (a *AlphaTokenInfo) GetPrice() float64 {
price, err := strconv.ParseFloat(a.Price, 64)
if err != nil {
log.Error().Err(err).Str("symbol", a.Symbol).Msg("Failed to parse Alpha token price")
return 0
}
return price
}
// GetPercentChange24h returns the 24h percentage change as float64
func (a *AlphaTokenInfo) GetPercentChange24h() float64 {
change, err := strconv.ParseFloat(a.PercentChange24h, 64)
if err != nil {
log.Error().Err(err).Str("symbol", a.Symbol).Msg("Failed to parse Alpha token 24h change")
return 0
}
return change
}
// fetchAlphaTokens fetches Alpha tokens from Binance API
func (ms *MarketData) fetchAlphaTokens() ([]AlphaTokenInfo, error) {
const alphaTokenURL = "https://www.binance.com/bapi/defi/v1/public/wallet-direct/buw/wallet/cex/alpha/all/token/list"
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Get(alphaTokenURL)
if err != nil {
return nil, fmt.Errorf("failed to fetch Alpha tokens: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Alpha token API returned status: %d", resp.StatusCode)
}
var tokenResponse AlphaTokenResponse
if err := json.NewDecoder(resp.Body).Decode(&tokenResponse); err != nil {
return nil, fmt.Errorf("failed to decode Alpha token response: %w", err)
}
if tokenResponse.Code != "000000" {
return nil, fmt.Errorf("Alpha token API returned error code: %s", tokenResponse.Code)
}
return tokenResponse.Data, nil
}
// refreshAlphaTokenCache refreshes the Alpha token cache
func (ms *MarketData) refreshAlphaTokenCache() {
ms.alphaCacheMutex.Lock()
defer ms.alphaCacheMutex.Unlock()
log.Info().Msg("Refreshing Alpha token cache")
tokens, err := ms.fetchAlphaTokens()
if err != nil {
log.Error().Err(err).Msg("Failed to refresh Alpha token cache")
return
}
// Clear existing cache
ms.alphaTokens = make(map[string]AlphaTokenInfo)
// Populate cache with new data
for _, token := range tokens {
symbol := token.Symbol // Already uppercase from API
ms.alphaTokens[symbol] = token
}
// Update spot prices for Alpha tokens (separate lock to avoid holding both mutexes)
ms.mu.Lock()
for _, token := range tokens {
symbol := token.Symbol
if price := token.GetPrice(); price > 0 {
ms.spotPrice[symbol] = price
}
}
ms.mu.Unlock()
ms.lastAlphaCacheUpdate = time.Now()
log.Info().Int("count", len(tokens)).Msg("Alpha token cache refreshed successfully")
}
// GetAlphaToken returns Alpha token info by symbol
func (ms *MarketData) GetAlphaToken(symbol string) (AlphaTokenInfo, bool) {
ms.alphaCacheMutex.RLock()
defer ms.alphaCacheMutex.RUnlock()
token, exists := ms.alphaTokens[symbol]
return token, exists
}
// IsAlphaToken checks if a symbol is an Alpha token
func (ms *MarketData) IsAlphaToken(symbol string) bool {
_, exists := ms.GetAlphaToken(symbol)
return exists
}
// shouldRefreshAlphaCache checks if the Alpha cache should be refreshed
func (ms *MarketData) shouldRefreshAlphaCache() bool {
// Refresh if cache is empty (first time) or if it's older than 30 minutes
if len(ms.alphaTokens) == 0 {
return true
}
return time.Since(ms.lastAlphaCacheUpdate) > 30*time.Minute
}
// ensureAlphaCacheLoaded ensures Alpha cache is loaded, refreshes if needed
func (ms *MarketData) ensureAlphaCacheLoaded() {
if ms.shouldRefreshAlphaCache() {
ms.refreshAlphaTokenCache()
}
}
+15 -1
View File
@@ -7,6 +7,8 @@ import (
)
func (ms *MarketData) GetFuturePrice(symbol string) (float64, float64, int64, bool) {
ms.mu.RLock()
defer ms.mu.RUnlock()
p, ok := ms.futureMarkPrice[symbol]
if !ok {
return 0, 0, 0, false
@@ -23,6 +25,8 @@ func (ms *MarketData) StartFutureWsMarkPrice() error {
}
func (ms *MarketData) futureWsMarkPriceHandler(event futures.WsAllMarkPriceEvent) {
ms.mu.Lock()
defer ms.mu.Unlock()
for _, priceEvent := range event {
price, err := strconv.ParseFloat(priceEvent.MarkPrice, 64)
if err != nil {
@@ -46,5 +50,15 @@ func (ms *MarketData) futureWsErrHandler(err error) {
}
func (ms *MarketData) GetAllFundRate() (map[string]float64, map[string]int64) {
return ms.futureFundingRate, ms.futureNextFundingTime
ms.mu.RLock()
defer ms.mu.RUnlock()
rates := make(map[string]float64, len(ms.futureFundingRate))
for k, v := range ms.futureFundingRate {
rates[k] = v
}
times := make(map[string]int64, len(ms.futureNextFundingTime))
for k, v := range ms.futureNextFundingTime {
times[k] = v
}
return rates, times
}
+18 -2
View File
@@ -1,13 +1,24 @@
package market
import "github.com/rs/zerolog/log"
import (
"sync"
"time"
"github.com/rs/zerolog/log"
)
type MarketData struct {
mu sync.RWMutex
futureMarkPrice map[string]float64
futureFundingRate map[string]float64
futureNextFundingTime map[string]int64
spotPrice map[string]float64
// Alpha token cache
alphaTokens map[string]AlphaTokenInfo
alphaCacheMutex sync.RWMutex
lastAlphaCacheUpdate time.Time
}
func NewMarketData() *MarketData {
@@ -17,9 +28,14 @@ func NewMarketData() *MarketData {
futureFundingRate: make(map[string]float64),
futureNextFundingTime: make(map[string]int64),
spotPrice: make(map[string]float64),
spotPrice: make(map[string]float64),
alphaTokens: make(map[string]AlphaTokenInfo),
}
_ = ms.StartFutureWsMarkPrice()
_ = ms.StartSpotWsMarkPrice()
// Initialize Alpha token cache
go ms.refreshAlphaTokenCache()
return ms
}
+25 -2
View File
@@ -1,14 +1,35 @@
package market
import (
"strconv"
"github.com/adshao/go-binance/v2"
"github.com/rs/zerolog/log"
"strconv"
)
func (ms *MarketData) GetSpotPrice(symbol string) (float64, bool) {
ms.mu.RLock()
p, ok := ms.spotPrice[symbol]
return p, ok
ms.mu.RUnlock()
if ok {
return p, true
}
// If not found, check if it's an Alpha token and ensure cache is loaded
ms.ensureAlphaCacheLoaded()
if ms.IsAlphaToken(symbol) {
if alphaToken, exists := ms.GetAlphaToken(symbol); exists {
price := alphaToken.GetPrice()
if price > 0 {
ms.mu.Lock()
ms.spotPrice[symbol] = price
ms.mu.Unlock()
return price, true
}
}
}
return 0, false
}
func (ms *MarketData) StartSpotWsMarkPrice() error {
@@ -21,6 +42,8 @@ func (ms *MarketData) StartSpotWsMarkPrice() error {
}
func (ms *MarketData) spotWsAllMarketsStatHandler(event binance.WsAllMarketsStatEvent) {
ms.mu.Lock()
defer ms.mu.Unlock()
for _, priceEvent := range event {
price, err := strconv.ParseFloat(priceEvent.LastPrice, 64)
if err != nil {