Compare commits
	
		
			4 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 71684004a0 | |||
| e911403bba | |||
| 328b4cc66e | |||
| ed7e7608fe | 
@@ -1,38 +1,26 @@
 | 
				
			|||||||
package main
 | 
					package main
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"context"
 | 
				
			||||||
	"sipro-mps/internal/ym/products"
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// package main
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// import (
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//	"context"
 | 
					 | 
				
			||||||
//	"fmt"
 | 
					 | 
				
			||||||
//	"sipro-mps/internal/wb/products/mapping/generated"
 | 
					 | 
				
			||||||
//	"strings"
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//	pb "sipro-mps/api/generated/v1/wb/products"
 | 
					 | 
				
			||||||
//	"sipro-mps/pkg/api/wb/client"
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//	"github.com/deliveryhero/pipeline/v2"
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// )
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// func main() {
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//		input := "1,2,3,4,5"
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
//		for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
 | 
					 | 
				
			||||||
//			for j := range out {
 | 
					 | 
				
			||||||
//				fmt.Printf("process: %s\n", out[j])
 | 
					 | 
				
			||||||
//			}
 | 
					 | 
				
			||||||
//		}
 | 
					 | 
				
			||||||
//	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func main() {
 | 
					func main() {
 | 
				
			||||||
	rs := products.GetCategoriesError("{\"status\":\"ERROR\",\"errors\":[{\"code\":\"BAD_REQUEST\",\"message\":\"Following categories are not leaf categories: 91735\"}]}")
 | 
						ctx, cacnel := context.WithTimeout(context.Background(), time.Second*5)
 | 
				
			||||||
	fmt.Println(rs)
 | 
						defer cacnel()
 | 
				
			||||||
 | 
						wg := sync.WaitGroup{}
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							defer func() {
 | 
				
			||||||
 | 
								wg.Done()
 | 
				
			||||||
 | 
								println("goroutine exit")
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-ctx.Done():
 | 
				
			||||||
 | 
								println("context done")
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
						wg.Add(1)
 | 
				
			||||||
 | 
						wg.Wait()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										18
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								go.sum
									
									
									
									
									
								
							@@ -61,6 +61,8 @@ github.com/google/cel-go v0.24.1/go.mod h1:Hdf9TqOaTNSFQA1ybQaRqATVoK7m/zcf7IMhG
 | 
				
			|||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 | 
					github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 | 
				
			||||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
 | 
					github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
 | 
				
			||||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
 | 
					github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
 | 
				
			||||||
 | 
					github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
 | 
				
			||||||
 | 
					github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
 | 
				
			||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 | 
					github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 | 
				
			||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 | 
					github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 | 
				
			||||||
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
 | 
					github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
 | 
				
			||||||
@@ -248,11 +250,27 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
 | 
				
			|||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
					gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
				
			||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 | 
					gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 | 
				
			||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
					gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
				
			||||||
 | 
					modernc.org/cc/v4 v4.25.2 h1:T2oH7sZdGvTaie0BRNFbIYsabzCxUQg8nLqCdQ2i0ic=
 | 
				
			||||||
 | 
					modernc.org/cc/v4 v4.25.2/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
 | 
				
			||||||
 | 
					modernc.org/ccgo/v4 v4.25.1 h1:TFSzPrAGmDsdnhT9X2UrcPMI3N/mJ9/X9ykKXwLhDsU=
 | 
				
			||||||
 | 
					modernc.org/ccgo/v4 v4.25.1/go.mod h1:njjuAYiPflywOOrm3B7kCB444ONP5pAVr8PIEoE0uDw=
 | 
				
			||||||
 | 
					modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
 | 
				
			||||||
 | 
					modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
 | 
				
			||||||
 | 
					modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
 | 
				
			||||||
 | 
					modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
 | 
				
			||||||
modernc.org/libc v1.62.1 h1:s0+fv5E3FymN8eJVmnk0llBe6rOxCu/DEU+XygRbS8s=
 | 
					modernc.org/libc v1.62.1 h1:s0+fv5E3FymN8eJVmnk0llBe6rOxCu/DEU+XygRbS8s=
 | 
				
			||||||
modernc.org/libc v1.62.1/go.mod h1:iXhATfJQLjG3NWy56a6WVU73lWOcdYVxsvwCgoPljuo=
 | 
					modernc.org/libc v1.62.1/go.mod h1:iXhATfJQLjG3NWy56a6WVU73lWOcdYVxsvwCgoPljuo=
 | 
				
			||||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
 | 
					modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
 | 
				
			||||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
 | 
					modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
 | 
				
			||||||
modernc.org/memory v1.9.1 h1:V/Z1solwAVmMW1yttq3nDdZPJqV1rM05Ccq6KMSZ34g=
 | 
					modernc.org/memory v1.9.1 h1:V/Z1solwAVmMW1yttq3nDdZPJqV1rM05Ccq6KMSZ34g=
 | 
				
			||||||
modernc.org/memory v1.9.1/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
 | 
					modernc.org/memory v1.9.1/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
 | 
				
			||||||
 | 
					modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
 | 
				
			||||||
 | 
					modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
 | 
				
			||||||
 | 
					modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
 | 
				
			||||||
 | 
					modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
 | 
				
			||||||
modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI=
 | 
					modernc.org/sqlite v1.37.0 h1:s1TMe7T3Q3ovQiK2Ouz4Jwh7dw4ZDqbebSDTlSJdfjI=
 | 
				
			||||||
modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM=
 | 
					modernc.org/sqlite v1.37.0/go.mod h1:5YiWv+YviqGMuGw4V+PNplcyaJ5v+vQd7TQOgkACoJM=
 | 
				
			||||||
 | 
					modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
 | 
				
			||||||
 | 
					modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
 | 
				
			||||||
 | 
					modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
 | 
				
			||||||
 | 
					modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,14 +34,16 @@ func fetchProductIds(ctx context.Context, client *api.Client, resultChan chan<-
 | 
				
			|||||||
	defer close(resultChan)
 | 
						defer close(resultChan)
 | 
				
			||||||
	lastId := ""
 | 
						lastId := ""
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
 | 
							if ctx.Err() != nil {
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{
 | 
							resp, err := client.Products().GetListOfProducts(ctx, &api.GetListOfProductsParams{
 | 
				
			||||||
			Filter: api.GetListOfProductsFilter{Visibility: "ALL"},
 | 
								Filter: api.GetListOfProductsFilter{Visibility: "ALL"},
 | 
				
			||||||
			LastId: lastId,
 | 
								LastId: lastId,
 | 
				
			||||||
			Limit:  1000,
 | 
								Limit:  1000,
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			// dev
 | 
					 | 
				
			||||||
			//panic(err)
 | 
					 | 
				
			||||||
			errChan <- fmt.Errorf("fetching product IDs: %w", err)
 | 
								errChan <- fmt.Errorf("fetching product IDs: %w", err)
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -70,11 +72,15 @@ func fetchProducts(ctx context.Context, client *api.Client, productIdsChan <-cha
 | 
				
			|||||||
		wg.Add(1)
 | 
							wg.Add(1)
 | 
				
			||||||
		go func() {
 | 
							go func() {
 | 
				
			||||||
			defer wg.Done()
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
								if ctx.Err() != nil {
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{
 | 
								resp, err := client.Products().ListProductsByIDs(ctx, &api.ListProductsByIDsParams{
 | 
				
			||||||
				ProductId: productIds,
 | 
									ProductId: productIds,
 | 
				
			||||||
			})
 | 
								})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				// dev
 | 
					
 | 
				
			||||||
				//panic(err)
 | 
									//panic(err)
 | 
				
			||||||
				errChan <- fmt.Errorf("fetching products: %w", err)
 | 
									errChan <- fmt.Errorf("fetching products: %w", err)
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
@@ -143,7 +149,6 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
 | 
				
			|||||||
	err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
						err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
				
			||||||
	if err == nil && len(cachedMessage.Products) > 0 {
 | 
						if err == nil && len(cachedMessage.Products) > 0 {
 | 
				
			||||||
		resultChan <- utils.DerefSlice(cachedMessage.Products)
 | 
							resultChan <- utils.DerefSlice(cachedMessage.Products)
 | 
				
			||||||
		//_ = client.EnqueueFetchProductsTask(types.TypeOzonFetchProducts, marketplaceId)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -161,6 +166,9 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
 | 
				
			|||||||
	converter := generated.ConverterImpl{}
 | 
						converter := generated.ConverterImpl{}
 | 
				
			||||||
	var allProducts []PbProduct
 | 
						var allProducts []PbProduct
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
 | 
							if ctx.Err() != nil {
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		if len(allProducts) == 0 {
 | 
							if len(allProducts) == 0 {
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -189,6 +197,8 @@ func (a *apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceI
 | 
				
			|||||||
			if !ok {
 | 
								if !ok {
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
							case <-ctx.Done():
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -14,6 +14,8 @@ func InitLocker() error {
 | 
				
			|||||||
	password := os.Getenv("REDIS_PASSWORD")
 | 
						password := os.Getenv("REDIS_PASSWORD")
 | 
				
			||||||
	locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
 | 
						locker, err := rueidislock.NewLocker(rueidislock.LockerOption{
 | 
				
			||||||
		ClientOption:   rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
 | 
							ClientOption:   rueidis.ClientOption{InitAddress: []string{redisAddr}, Password: password},
 | 
				
			||||||
 | 
							KeyMajority:    1,
 | 
				
			||||||
 | 
							NoLoopTracking: true,
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -30,6 +30,7 @@ func WbDimensionsToProto(s wbclient.OptContentV2GetCardsListPostOKCardsItemDimen
 | 
				
			|||||||
	value := s.Value
 | 
						value := s.Value
 | 
				
			||||||
	result.Length = int64(value.Length.Value)
 | 
						result.Length = int64(value.Length.Value)
 | 
				
			||||||
	result.Width = int64(value.Width.Value)
 | 
						result.Width = int64(value.Width.Value)
 | 
				
			||||||
 | 
						result.Height = int64(value.Height.Value)
 | 
				
			||||||
	result.WeightBrutto = float32(value.WeightBrutto.Value)
 | 
						result.WeightBrutto = float32(value.WeightBrutto.Value)
 | 
				
			||||||
	result.IsValid = value.IsValid.Value
 | 
						result.IsValid = value.IsValid.Value
 | 
				
			||||||
	return &result
 | 
						return &result
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -11,6 +11,7 @@ import (
 | 
				
			|||||||
	wbapi "sipro-mps/pkg/api/wb/client"
 | 
						wbapi "sipro-mps/pkg/api/wb/client"
 | 
				
			||||||
	"sipro-mps/pkg/utils"
 | 
						"sipro-mps/pkg/utils"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/go-faster/errors"
 | 
				
			||||||
	"github.com/samber/lo"
 | 
						"github.com/samber/lo"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -57,10 +58,15 @@ func fetchProducts(
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	request.Settings.Value.Filter.SetTo(wbapi.ContentV2GetCardsListPostReqSettingsFilter{})
 | 
						request.Settings.Value.Filter.SetTo(wbapi.ContentV2GetCardsListPostReqSettingsFilter{})
 | 
				
			||||||
	request.Settings.Value.Filter.Value.WithPhoto.SetTo(-1)
 | 
						request.Settings.Value.Filter.Value.WithPhoto.SetTo(-1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						request.Settings.Value.Sort.Value.Ascending.SetTo(true)
 | 
				
			||||||
	currentRetry := 0
 | 
						currentRetry := 0
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")})
 | 
							response, err := client.ContentV2GetCardsListPost(ctx, &request, wbapi.ContentV2GetCardsListPostParams{Locale: wbapi.NewOptString("ru")})
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
 | 
								if errors.Is(err, context.Canceled) {
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			currentRetry++
 | 
								currentRetry++
 | 
				
			||||||
			if currentRetry >= maxRetries {
 | 
								if currentRetry >= maxRetries {
 | 
				
			||||||
				errChan <- fmt.Errorf("fetching product IDs: %w", err)
 | 
									errChan <- fmt.Errorf("fetching product IDs: %w", err)
 | 
				
			||||||
@@ -83,6 +89,8 @@ func fetchProducts(
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			request.Settings.Value.Cursor.Value.UpdatedAt.SetTo(r.Response.Cursor.Value.UpdatedAt.Value)
 | 
								request.Settings.Value.Cursor.Value.UpdatedAt.SetTo(r.Response.Cursor.Value.UpdatedAt.Value)
 | 
				
			||||||
			request.Settings.Value.Cursor.Value.NmID.SetTo(r.Response.Cursor.Value.NmID.Value)
 | 
								request.Settings.Value.Cursor.Value.NmID.SetTo(r.Response.Cursor.Value.NmID.Value)
 | 
				
			||||||
 | 
								request.Settings.Value.Sort.Value.Ascending.SetTo(true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case *wbapi.R429Headers:
 | 
							case *wbapi.R429Headers:
 | 
				
			||||||
			err = wb.SetRateLimitRetry(ctx, sellerId, r.XRatelimitRetry.Value, r.XRatelimitLimit.Value, r.XRatelimitReset.Value)
 | 
								err = wb.SetRateLimitRetry(ctx, sellerId, r.XRatelimitRetry.Value, r.XRatelimitLimit.Value, r.XRatelimitReset.Value)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
@@ -116,7 +124,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
 | 
				
			|||||||
	err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
						err = redis.ReadProtoMessage(ctx, key, &cachedMessage)
 | 
				
			||||||
	if err == nil && len(cachedMessage.Products) > 0 {
 | 
						if err == nil && len(cachedMessage.Products) > 0 {
 | 
				
			||||||
		resultChan <- utils.DerefSlice(cachedMessage.Products)
 | 
							resultChan <- utils.DerefSlice(cachedMessage.Products)
 | 
				
			||||||
		//_ = client.EnqueueFetchProductsTask(types.TypeWbFetchProducts, marketplaceId)
 | 
					 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	locker := *redis.Locker
 | 
						locker := *redis.Locker
 | 
				
			||||||
@@ -131,6 +138,9 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
 | 
				
			|||||||
	go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
 | 
						go a.StreamAllProducts(ctx, marketplaceId, innerResultChan, innerErrChan)
 | 
				
			||||||
	var allProducts []pb.Product
 | 
						var allProducts []pb.Product
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
 | 
							if ctx.Err() != nil {
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		if len(allProducts) == 0 {
 | 
							if len(allProducts) == 0 {
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@@ -138,7 +148,6 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
 | 
				
			|||||||
			Products: utils.ToPtrs(allProducts),
 | 
								Products: utils.ToPtrs(allProducts),
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		_ = redis.WriteProtoMessage(ctx, key, &message)
 | 
							_ = redis.WriteProtoMessage(ctx, key, &message)
 | 
				
			||||||
 | 
					 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
@@ -157,6 +166,8 @@ func (a apiRepository) StreamAllProductsCache(ctx context.Context, marketplaceId
 | 
				
			|||||||
			})
 | 
								})
 | 
				
			||||||
			allProducts = append(allProducts, pbProducts...)
 | 
								allProducts = append(allProducts, pbProducts...)
 | 
				
			||||||
			resultChan <- pbProducts
 | 
								resultChan <- pbProducts
 | 
				
			||||||
 | 
							case <-ctx.Done():
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -5,6 +5,7 @@ import (
 | 
				
			|||||||
	"encoding/json"
 | 
						"encoding/json"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
	"sipro-mps/internal/redis"
 | 
						"sipro-mps/internal/redis"
 | 
				
			||||||
	"sipro-mps/pkg/utils"
 | 
						"sipro-mps/pkg/utils"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@@ -69,6 +70,9 @@ type RateLimitTransport struct {
 | 
				
			|||||||
func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
					func (t *RateLimitTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
	ctx := req.Context()
 | 
						ctx := req.Context()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						req.Header.Set("X-Client-Secret", os.Getenv("WB_SECRET_TOKEN"))
 | 
				
			||||||
 | 
						req.Header.Set("User-Agent", "wbas_seller.denco.store3547")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	tokenString := req.Header.Get("Authorization")
 | 
						tokenString := req.Header.Get("Authorization")
 | 
				
			||||||
	authData := utils.NewWbAuthData(tokenString)
 | 
						authData := utils.NewWbAuthData(tokenString)
 | 
				
			||||||
	authDataBytes, err := json.Marshal(authData)
 | 
						authDataBytes, err := json.Marshal(authData)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -128,6 +128,9 @@ func (r *apiRepository) GetProducts(ctx context.Context, marketplaceID int, req
 | 
				
			|||||||
	// all offers
 | 
						// all offers
 | 
				
			||||||
	var allOffers []*pb.GetProductsResponse_Offer
 | 
						var allOffers []*pb.GetProductsResponse_Offer
 | 
				
			||||||
	defer func() {
 | 
						defer func() {
 | 
				
			||||||
 | 
							if ctx.Err() != nil {
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		// store only if offers is 90% or more then requested
 | 
							// store only if offers is 90% or more then requested
 | 
				
			||||||
		if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
 | 
							if len(allOffers) < int(float64(len(req.OfferIds))*0.9) {
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
@@ -243,10 +246,16 @@ func (r *apiRepository) CalculateProductTariffs(ctx context.Context, marketplace
 | 
				
			|||||||
	offerChunks := lo.Chunk(req.Offers, defaultChunkSize)
 | 
						offerChunks := lo.Chunk(req.Offers, defaultChunkSize)
 | 
				
			||||||
	maxRetries := 5
 | 
						maxRetries := 5
 | 
				
			||||||
	for chunkIndex, offerChunk := range offerChunks {
 | 
						for chunkIndex, offerChunk := range offerChunks {
 | 
				
			||||||
 | 
							if ctx.Err() != nil {
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk))
 | 
							fmt.Printf("Processing chunk %d/%d with %d offers\n", chunkIndex+1, len(offerChunks), len(offerChunk))
 | 
				
			||||||
		var globalError error = nil
 | 
							var globalError error = nil
 | 
				
			||||||
		for range maxRetries {
 | 
							for range maxRetries {
 | 
				
			||||||
			response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex)
 | 
								response, err := r.processTariffChunk(ctx, client, ymParameters, offerChunk, chunkIndex)
 | 
				
			||||||
 | 
								if ctx.Err() != nil {
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				globalError = err
 | 
									globalError = err
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user