Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 71684004a0 | |||
| e911403bba | |||
| 328b4cc66e | |||
| ed7e7608fe |
@@ -1,38 +1,26 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sipro-mps/internal/ym/products"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// package main
|
||||
//
|
||||
// import (
|
||||
//
|
||||
// "context"
|
||||
// "fmt"
|
||||
// "sipro-mps/internal/wb/products/mapping/generated"
|
||||
// "strings"
|
||||
//
|
||||
// pb "sipro-mps/api/generated/v1/wb/products"
|
||||
// "sipro-mps/pkg/api/wb/client"
|
||||
//
|
||||
// "github.com/deliveryhero/pipeline/v2"
|
||||
//
|
||||
// )
|
||||
//
|
||||
// func main() {
|
||||
//
|
||||
// input := "1,2,3,4,5"
|
||||
//
|
||||
// for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
|
||||
// for j := range out {
|
||||
// fmt.Printf("process: %s\n", out[j])
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
func main() {
|
||||
rs := products.GetCategoriesError("{\"status\":\"ERROR\",\"errors\":[{\"code\":\"BAD_REQUEST\",\"message\":\"Following categories are not leaf categories: 91735\"}]}")
|
||||
fmt.Println(rs)
|
||||
ctx, cacnel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cacnel()
|
||||
wg := sync.WaitGroup{}
|
||||
go func() {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
println("goroutine exit")
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
println("context done")
|
||||
return
|
||||
}
|
||||
}()
|
||||
wg.Add(1)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
18
go.sum
18
go.sum
@@ -61,6 +61,8 @@ github.com/google/cel-go v0.24.1/go.mod h1:Hdf9TqOaTNSFQA1ybQaRqATVoK7m/zcf7IMhG
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
|
||||
@@ -248,11 +250,27 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
modernc.org/cc/v4 v4.25.2 h1:T2oH7sZdGvTaie0BRNFbIYsabzCxUQg8nLqCdQ2i0ic=
|
||||
modernc.org/cc/v4 v4.25.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
|
||||
modernc.org/ccgo/v4 v4.25.1 h1:TFSzPrAGmDsdnhT9X2UrcPMI3N/mJ9/X9ykKXwLhDsU=
|
||||
modernc.org/ccgo/v4 v4.25.1/go.mod h1:njjuAYiPflywOOrm3B7kCB444ONP5pAVr8PIEoE0uDw=
|
||||
modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
|
||||
modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
|
||||
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
|
||||
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
|
||||
modernc.org/libc v1.62.1 h1:s0+fv5E3FymN8eJVmnk0llBe6rOxCu/DEU+XygRbS8s=
|
||||
modernc.org/libc v1.62.1/go.mod h1:iXhATfJQLjG3NWy56a6WVU73lWOcdYVxsvwCgoPljuo=
|
||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
||||
modernc.org/memory v1.9.1 h1:V/Z1solwAVmMW1yttq3nDdZPJqV1rM05Ccq6KMSZ34g=
|
||||
modernc.org/memory v1.9.1/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
||||
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
||||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
||||
modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI=
|
||||
modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM=
|
||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
|
||||
@@ -34,14 +34,16 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<-
|
||||
defer close(resultChan)
|
||||
lastId := ""
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{
|
||||
Filter: api.GetListOfProductsFilter{Visibility: "ALL"},
|
||||
LastId: lastId,
|
||||
Limit: 1000,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// dev
|
||||
//panic(err)
|
||||
errChan <- fmt.Errorf("fetching product IDs: %w", err)
|
||||
return
|
||||
}
|
||||
@@ -70,11 +72,15 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{
|
||||
ProductId: productIds,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// dev
|
||||
|
||||
//panic(err)
|
||||
errChan <- fmt.Errorf("fetching products: %w", err)
|
||||
return
|
||||
@@ -143,7 +149,6 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
|
||||
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
|
||||
if err == nil && len(cachedMessage.Products) > 0 {
|
||||
resultChan <- utils.DerefSlice(cachedMessage.Products)
|
||||
//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -161,6 +166,9 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
|
||||
converter := generated.ConverterImpl{}
|
||||
var allProducts []PbProduct
|
||||
defer func() {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if len(allProducts) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -189,6 +197,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,9 @@ func InitLocker() error {
|
||||
redisAddr := os.Getenv("REDIS_ADDR")
|
||||
password := os.Getenv("REDIS_PASSWORD")
|
||||
locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
|
||||
ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
|
||||
ClientOption: rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
|
||||
KeyMajority: 1,
|
||||
NoLoopTracking: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -30,6 +30,7 @@ func WbDimensionsToProto(s wbclient.OptContentV2GetCardsListPostOKCardsItemDimen
|
||||
value := s.Value
|
||||
result.Length = int64(value.Length.Value)
|
||||
result.Width = int64(value.Width.Value)
|
||||
result.Height = int64(value.Height.Value)
|
||||
result.WeightBrutto = float32(value.WeightBrutto.Value)
|
||||
result.IsValid = value.IsValid.Value
|
||||
return &result
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
wbapi "sipro-mps/pkg/api/wb/client"
|
||||
"sipro-mps/pkg/utils"
|
||||
|
||||
"github.com/go-faster/errors"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
@@ -57,10 +58,15 @@ func fetchProducts(
|
||||
|
||||
request.Settings.Value.Filter.SetTo(wbapi.ContentV2GetCardsListPostReqSettingsFilter{})
|
||||
request.Settings.Value.Filter.Value.WithPhoto.SetTo(-1)
|
||||
|
||||
request.Settings.Value.Sort.Value.Ascending.SetTo(true)
|
||||
currentRetry := 0
|
||||
for {
|
||||
response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")})
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
currentRetry++
|
||||
if currentRetry >= maxRetries {
|
||||
errChan <- fmt.Errorf("fetching product IDs: %w", err)
|
||||
@@ -83,6 +89,8 @@ func fetchProducts(
|
||||
}
|
||||
request.Settings.Value.Cursor.Value.UpdatedAt.SetTo(r.Response.Cursor.Value.UpdatedAt.Value)
|
||||
request.Settings.Value.Cursor.Value.NmID.SetTo(r.Response.Cursor.Value.NmID.Value)
|
||||
request.Settings.Value.Sort.Value.Ascending.SetTo(true)
|
||||
|
||||
case *wbapi.R429Headers:
|
||||
err = wb.SetRateLimitRetry(ctx, sellerId, r.XRatelimitRetry.Value, r.XRatelimitLimit.Value, r.XRatelimitReset.Value)
|
||||
if err != nil {
|
||||
@@ -116,7 +124,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
|
||||
err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
|
||||
if err == nil && len(cachedMessage.Products) > 0 {
|
||||
resultChan <- utils.DerefSlice(cachedMessage.Products)
|
||||
//_ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
|
||||
return
|
||||
}
|
||||
locker := *redis.Locker
|
||||
@@ -131,6 +138,9 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
|
||||
go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
|
||||
var allProducts []pb.Product
|
||||
defer func() {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if len(allProducts) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -138,7 +148,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
|
||||
Products: utils.ToPtrs(allProducts),
|
||||
}
|
||||
_ = redis.WriteProtoMessage(ctx, key, &message)
|
||||
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
@@ -157,6 +166,8 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
|
||||
})
|
||||
allProducts = append(allProducts, pbProducts...)
|
||||
resultChan <- pbProducts
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"sipro-mps/internal/redis"
|
||||
"sipro-mps/pkg/utils"
|
||||
"time"
|
||||
@@ -69,6 +70,9 @@ type RateLimitTransport struct {
|
||||
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
ctx := req.Context()
|
||||
|
||||
req.Header.Set("X-Client-Secret", os.Getenv("WB_SECRET_TOKEN"))
|
||||
req.Header.Set("User-Agent", "wbas_seller.denco.store3547")
|
||||
|
||||
tokenString := req.Header.Get("Authorization")
|
||||
authData := utils.NewWbAuthData(tokenString)
|
||||
authDataBytes, err := json.Marshal(authData)
|
||||
|
||||
@@ -128,6 +128,9 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req
|
||||
// all offers
|
||||
var allOffers []*pb.GetProductsResponse_Offer
|
||||
defer func() {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
// store only if offers is 90% or more then requested
|
||||
if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
|
||||
return
|
||||
@@ -243,10 +246,16 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace
|
||||
offerChunks := lo.Chunk(req.Offers, defaultChunkSize)
|
||||
maxRetries := 5
|
||||
for chunkIndex, offerChunk := range offerChunks {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk))
|
||||
var globalError error = nil
|
||||
for range maxRetries {
|
||||
response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
globalError = err
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user