Compare commits

4 Commits
temp ... master

8 changed files with 81 additions and 38 deletions

View File

@@ -1,38 +1,26 @@
package main
import (
"fmt"
"sipro-mps/internal/ym/products"
"context"
"sync"
"time"
)
// package main
//
// import (
//
// "context"
// "fmt"
// "sipro-mps/internal/wb/products/mapping/generated"
// "strings"
//
// pb "sipro-mps/api/generated/v1/wb/products"
// "sipro-mps/pkg/api/wb/client"
//
// "github.com/deliveryhero/pipeline/v2"
//
// )
//
// func main() {
//
// input := "1,2,3,4,5"
//
// for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
// for j := range out {
// fmt.Printf("process: %s\n", out[j])
// }
// }
// }
func main() {
rs := products.GetCategoriesError("{\"status\":\"ERROR\",\"errors\":[{\"code\":\"BAD_REQUEST\",\"message\":\"Following categories are not leaf categories: 91735\"}]}")
fmt.Println(rs)
ctx, cacnel := context.WithTimeout(context.Background(), time.Second*5)
defer cacnel()
wg := sync.WaitGroup{}
go func() {
defer func() {
wg.Done()
println("goroutine exit")
}()
select {
case <-ctx.Done():
println("context done")
return
}
}()
wg.Add(1)
wg.Wait()
}

18
go.sum
View File

@@ -61,6 +61,8 @@ github.com/google/cel-go v0.24.1/go.mod h1:Hdf9TqOaTNSFQA1ybQaRqATVoK7m/zcf7IMhG
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
@@ -248,11 +250,27 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.25.2 h1:T2oH7sZdGvTaie0BRNFbIYsabzCxUQg8nLqCdQ2i0ic=
modernc.org/cc/v4 v4.25.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.25.1 h1:TFSzPrAGmDsdnhT9X2UrcPMI3N/mJ9/X9ykKXwLhDsU=
modernc.org/ccgo/v4 v4.25.1/go.mod h1:njjuAYiPflywOOrm3B7kCB444ONP5pAVr8PIEoE0uDw=
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/libc v1.62.1 h1:s0+fv5E3FymN8eJVmnk0llBe6rOxCu/DEU+XygRbS8s=
modernc.org/libc v1.62.1/go.mod h1:iXhATfJQLjG3NWy56a6WVU73lWOcdYVxsvwCgoPljuo=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.9.1 h1:V/Z1solwAVmMW1yttq3nDdZPJqV1rM05Ccq6KMSZ34g=
modernc.org/memory v1.9.1/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI=
modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

View File

@@ -34,14 +34,16 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<-
defer close(resultChan)
lastId := ""
for {
if ctx.Err() != nil {
return
}
resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{
Filter: api.GetListOfProductsFilter{Visibility: "ALL"},
LastId: lastId,
Limit: 1000,
})
if err != nil {
// dev
//panic(err)
errChan <- fmt.Errorf("fetching product IDs: %w", err)
return
}
@@ -70,11 +72,15 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha
wg.Add(1)
go func() {
defer wg.Done()
if ctx.Err() != nil {
return
}
resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{
ProductId: productIds,
})
if err != nil {
// dev
//panic(err)
errChan <- fmt.Errorf("fetching products: %w", err)
return
@@ -143,7 +149,6 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
if err == nil && len(cachedMessage.Products) > 0 {
resultChan <- utils.DerefSlice(cachedMessage.Products)
//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
return
}
@@ -161,6 +166,9 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
converter := generated.ConverterImpl{}
var allProducts []PbProduct
defer func() {
if ctx.Err() != nil {
return
}
if len(allProducts) == 0 {
return
}
@@ -189,6 +197,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
if !ok {
return
}
case <-ctx.Done():
return
}
}
}

View File

@@ -13,7 +13,9 @@ func InitLocker() error {
redisAddr := os.Getenv("REDIS_ADDR")
password := os.Getenv("REDIS_PASSWORD")
locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
KeyMajority: 1,
NoLoopTracking: true,
})
if err != nil {
return err

View File

@@ -30,6 +30,7 @@ func WbDimensionsToProto(s wbclient.OptContentV2GetCardsListPostOKCardsItemDimen
value := s.Value
result.Length = int64(value.Length.Value)
result.Width = int64(value.Width.Value)
result.Height = int64(value.Height.Value)
result.WeightBrutto = float32(value.WeightBrutto.Value)
result.IsValid = value.IsValid.Value
return &result

View File

@@ -11,6 +11,7 @@ import (
wbapi "sipro-mps/pkg/api/wb/client"
"sipro-mps/pkg/utils"
"github.com/go-faster/errors"
"github.com/samber/lo"
)
@@ -57,10 +58,15 @@ func fetchProducts(
request.Settings.Value.Filter.SetTo(wbapi.ContentV2GetCardsListPostReqSettingsFilter{})
request.Settings.Value.Filter.Value.WithPhoto.SetTo(-1)
request.Settings.Value.Sort.Value.Ascending.SetTo(true)
currentRetry := 0
for {
response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
currentRetry++
if currentRetry >= maxRetries {
errChan <- fmt.Errorf("fetching product IDs: %w", err)
@@ -83,6 +89,8 @@ func fetchProducts(
}
request.Settings.Value.Cursor.Value.UpdatedAt.SetTo(r.Response.Cursor.Value.UpdatedAt.Value)
request.Settings.Value.Cursor.Value.NmID.SetTo(r.Response.Cursor.Value.NmID.Value)
request.Settings.Value.Sort.Value.Ascending.SetTo(true)
case *wbapi.R429Headers:
err = wb.SetRateLimitRetry(ctx, sellerId, r.XRatelimitRetry.Value, r.XRatelimitLimit.Value, r.XRatelimitReset.Value)
if err != nil {
@@ -116,7 +124,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
if err == nil && len(cachedMessage.Products) > 0 {
resultChan <- utils.DerefSlice(cachedMessage.Products)
//_ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
return
}
locker := *redis.Locker
@@ -131,6 +138,9 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
var allProducts []pb.Product
defer func() {
if ctx.Err() != nil {
return
}
if len(allProducts) == 0 {
return
}
@@ -138,7 +148,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
Products: utils.ToPtrs(allProducts),
}
_ = redis.WriteProtoMessage(ctx, key, &message)
}()
for {
select {
@@ -157,6 +166,8 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
})
allProducts = append(allProducts, pbProducts...)
resultChan <- pbProducts
case <-ctx.Done():
return
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"sipro-mps/internal/redis"
"sipro-mps/pkg/utils"
"time"
@@ -69,6 +70,9 @@ type RateLimitTransport struct {
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
req.Header.Set("X-Client-Secret", os.Getenv("WB_SECRET_TOKEN"))
req.Header.Set("User-Agent", "wbas_seller.denco.store3547")
tokenString := req.Header.Get("Authorization")
authData := utils.NewWbAuthData(tokenString)
authDataBytes, err := json.Marshal(authData)

View File

@@ -128,6 +128,9 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req
// all offers
var allOffers []*pb.GetProductsResponse_Offer
defer func() {
if ctx.Err() != nil {
return
}
// store only if offers is 90% or more then requested
if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
return
@@ -243,10 +246,16 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace
offerChunks := lo.Chunk(req.Offers, defaultChunkSize)
maxRetries := 5
for chunkIndex, offerChunk := range offerChunks {
if ctx.Err() != nil {
return
}
fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk))
var globalError error = nil
for range maxRetries {
response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex)
if ctx.Err() != nil {
return
}
if err != nil {
globalError = err
continue