Skip to content

Commit 2950c55

Browse files
committed
add download-speed probe item & runtime option.
1 parent 2eb03bf commit 2950c55

File tree

3 files changed

+254
-16
lines changed

3 files changed

+254
-16
lines changed

lib/const.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ const (
105105
OptionCloudBoxID = "cloudBoxID"
106106
OptionQueryParam = "queryParam"
107107
OptionForcePathStyle = "forcePathStyle"
108+
OptionRuntime = "runtime"
108109
)
109110

110111
// the elements show in stat object

lib/option.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ var OptionMap = map[string]Option{
284284
OptionForcePathStyle: Option{"", "--force-path-style", "", OptionTypeFlagTrue, "", "",
285285
"使用 path style 访问方式",
286286
"Use path-style access "},
287+
OptionRuntime: Option{"", "--runtime", "", OptionTypeInt64, "", "",
288+
"设置命令的持续的运行时间",
289+
"specifies the max running time of the command."},
287290
}
288291

289292
func (T *Option) getHelp(language string) string {

lib/probe.go

Lines changed: 250 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package lib
33
import (
44
"bytes"
55
"container/list"
6+
"context"
67
"fmt"
78
"io"
89
"io/ioutil"
@@ -366,6 +367,9 @@ var probeCommand = ProbeCommand{
366367
OptionRegion,
367368
OptionCloudBoxID,
368369
OptionForcePathStyle,
370+
OptionParallel,
371+
OptionPartSize,
372+
OptionRuntime,
369373
},
370374
},
371375
}
@@ -498,6 +502,8 @@ func (pc *ProbeCommand) RunCommand() error {
498502
}
499503
} else if pc.pbOption.probeItem == "upload-speed" || pc.pbOption.probeItem == "download-speed" {
500504
err = pc.DetectBandWidth()
505+
} else if pc.pbOption.probeItem == "download-time" {
506+
err = pc.DetectDownloadTime()
501507
} else {
502508
err = fmt.Errorf("not support %s", pc.pbOption.probeItem)
503509
}
@@ -526,26 +532,35 @@ func (pc *ProbeCommand) RunCommand() error {
526532
return err
527533
}
528534

529-
func (pc *ProbeCommand) PutObject(bucket *oss.Bucket, st *StatBandWidth, reader io.Reader) {
530-
var options []oss.Option
531-
options = append(options, oss.Progress(st))
532-
uniqKey := strconv.FormatInt(time.Now().UnixNano(), 10) + "-" + randStr(10)
533-
objectName := objectPrefex + uniqKey
534-
err := bucket.PutObject(objectName, reader, options...)
535-
if err != nil && !strings.Contains(err.Error(), "ossutil probe closed") {
536-
fmt.Printf("%s\n", err.Error())
535+
func (pc *ProbeCommand) PutObjectWithContext(bucket *oss.Bucket, st *StatBandWidth, reader io.Reader, ctx context.Context) {
536+
for {
537+
var options []oss.Option
538+
options = append(options, oss.Progress(st), oss.WithContext(ctx))
539+
uniqKey := strconv.FormatInt(time.Now().UnixNano(), 10) + "-" + randStr(10)
540+
objectName := objectPrefex + uniqKey
541+
err := bucket.PutObject(objectName, reader, options...)
542+
if err != nil {
543+
select {
544+
case <-ctx.Done():
545+
return
546+
default:
547+
}
548+
}
537549
}
538550
}
539551

540-
func (pc *ProbeCommand) GetObject(bucket *oss.Bucket, objectName string, st *StatBandWidth) {
552+
func (pc *ProbeCommand) GetObjectWithContext(bucket *oss.Bucket, objectName string, st *StatBandWidth, ctx context.Context) {
541553
var options []oss.Option
542-
options = append(options, oss.Progress(st))
554+
options = append(options, oss.Progress(st), oss.WithContext(ctx))
543555
options = append(options, oss.AcceptEncoding("identity"))
544556
for {
545557
result, err := bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectName}, options)
546558
if err != nil {
547-
fmt.Printf("GetObject error,%s", err.Error())
548-
return
559+
select {
560+
case <-ctx.Done():
561+
return
562+
default:
563+
}
549564
}
550565
io.Copy(ioutil.Discard, result.Response.Body)
551566
result.Response.Close()
@@ -585,13 +600,15 @@ func (pc *ProbeCommand) DetectBandWidth() error {
585600
if pc.pbOption.probeItem == "upload-speed" {
586601
appendReader.RandText = []byte(strings.Repeat("1", 32*1024))
587602
}
603+
ctx := context.Background()
604+
ctx, cancel := context.WithCancel(ctx)
588605

589606
for i := 0; i < numCpu; i++ {
590607
time.Sleep(time.Duration(50) * time.Millisecond)
591608
if pc.pbOption.probeItem == "upload-speed" {
592-
go pc.PutObject(bucket, &statBandwidth, &appendReader)
609+
go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
593610
} else if pc.pbOption.probeItem == "download-speed" {
594-
go pc.GetObject(bucket, pc.pbOption.objectName, &statBandwidth)
611+
go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
595612
}
596613
}
597614

@@ -641,9 +658,9 @@ func (pc *ProbeCommand) DetectBandWidth() error {
641658
for i := 0; i < addParallel; i++ {
642659
time.Sleep(time.Duration(50) * time.Millisecond)
643660
if pc.pbOption.probeItem == "upload-speed" {
644-
go pc.PutObject(bucket, &statBandwidth, &appendReader)
661+
go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
645662
} else if pc.pbOption.probeItem == "download-speed" {
646-
go pc.GetObject(bucket, pc.pbOption.objectName, &statBandwidth)
663+
go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
647664
}
648665
}
649666
fmt.Printf("\n")
@@ -655,6 +672,7 @@ func (pc *ProbeCommand) DetectBandWidth() error {
655672
}
656673
}
657674

675+
cancel()
658676
appendReader.Close()
659677

660678
maxIndex := 0
@@ -668,6 +686,46 @@ func (pc *ProbeCommand) DetectBandWidth() error {
668686

669687
fmt.Printf("\nsuggest parallel is %d, max average speed is %.2f(KB/s)\n", averageList[maxIndex].Parallel, averageList[maxIndex].AveSpeed)
670688

689+
maxRuntime, _ := GetInt(OptionRuntime, pc.command.options)
690+
691+
if maxRuntime > 0 {
692+
time.Sleep(time.Duration(5) * time.Second)
693+
ctx = context.Background()
694+
ctx, cancel = context.WithCancel(ctx)
695+
addParallel = averageList[maxIndex].Parallel
696+
statBandwidth.Reset(addParallel)
697+
fmt.Printf("\nrun %s %d seconds with parallel %d\n", pc.pbOption.probeItem, maxRuntime, addParallel)
698+
for i := 0; i < addParallel; i++ {
699+
if pc.pbOption.probeItem == "upload-speed" {
700+
go pc.PutObjectWithContext(bucket, &statBandwidth, &appendReader, ctx)
701+
} else if pc.pbOption.probeItem == "download-speed" {
702+
go pc.GetObjectWithContext(bucket, pc.pbOption.objectName, &statBandwidth, ctx)
703+
}
704+
}
705+
706+
startT := time.Now().UnixNano() / 1000 / 1000 / 1000
707+
for {
708+
time.Sleep(time.Duration(1) * time.Second)
709+
nowStat = statBandwidth.GetStat()
710+
nowTick = time.Now().UnixNano() / 1000 / 1000
711+
712+
nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
713+
averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
714+
maxSpeed := nowStat.MaxSpeed
715+
if nowSpeed > maxSpeed {
716+
maxSpeed = nowSpeed
717+
statBandwidth.SetMaxSpeed(maxSpeed)
718+
}
719+
fmt.Printf("\rparallel:%d,average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", addParallel, averSpeed, nowSpeed, maxSpeed)
720+
oldStat = nowStat
721+
currT := time.Now().UnixNano() / 1000 / 1000 / 1000
722+
if startT+maxRuntime < currT {
723+
cancel()
724+
break
725+
}
726+
}
727+
}
728+
671729
return nil
672730
}
673731

@@ -1317,3 +1375,179 @@ func (pc *ProbeCommand) probeUploadFileNormal(absFileName string, objectName str
13171375
}
13181376
return nil
13191377
}
1378+
1379+
type downloadPart struct {
1380+
Index int // Part number, starting from 0
1381+
Start int64 // Start index
1382+
End int64 // End index
1383+
}
1384+
1385+
type downloadWorkerArg struct {
1386+
bucket *oss.Bucket
1387+
key string
1388+
}
1389+
1390+
func getPartEnd(begin int64, total int64, per int64) int64 {
1391+
if begin+per > total {
1392+
return total - 1
1393+
}
1394+
return begin + per - 1
1395+
}
1396+
1397+
func downloadWorker(id int, arg downloadWorkerArg, jobs <-chan downloadPart, results chan<- downloadPart, failed chan<- error, die <-chan bool, st *StatBandWidth) {
1398+
for part := range jobs {
1399+
var options []oss.Option
1400+
r := oss.Range(part.Start, part.End)
1401+
options = append(options, r, oss.Progress(st))
1402+
options = append(options, oss.AcceptEncoding("identity"))
1403+
result, err := arg.bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: arg.key}, options)
1404+
if err != nil {
1405+
fmt.Printf("GetObject error,%s", err.Error())
1406+
return
1407+
}
1408+
_, err = io.Copy(ioutil.Discard, result.Response.Body)
1409+
result.Response.Close()
1410+
1411+
select {
1412+
case <-die:
1413+
return
1414+
default:
1415+
}
1416+
1417+
if err != nil {
1418+
failed <- err
1419+
break
1420+
}
1421+
results <- part
1422+
}
1423+
}
1424+
1425+
func downloadScheduler(jobs chan downloadPart, parts []downloadPart) {
1426+
for _, part := range parts {
1427+
jobs <- part
1428+
}
1429+
close(jobs)
1430+
}
1431+
1432+
func (pc *ProbeCommand) DetectDownloadTime() error {
1433+
if pc.pbOption.bucketName == "" {
1434+
return fmt.Errorf("--bucketname is empty")
1435+
}
1436+
1437+
bucket, err := pc.command.ossBucket(pc.pbOption.bucketName)
1438+
if err != nil {
1439+
return err
1440+
}
1441+
1442+
//if pc.pbOption.probeItem == "download-time" {
1443+
if pc.pbOption.objectName == "" {
1444+
return fmt.Errorf("--object is empty when probe-item is download-time")
1445+
}
1446+
1447+
bExist, err := bucket.IsObjectExist(pc.pbOption.objectName)
1448+
if err != nil {
1449+
return err
1450+
}
1451+
1452+
if !bExist {
1453+
return fmt.Errorf("oss object is not exist,%s", pc.pbOption.objectName)
1454+
}
1455+
1456+
meta, err := bucket.GetObjectDetailedMeta(pc.pbOption.objectName)
1457+
if err != nil {
1458+
return err
1459+
}
1460+
1461+
objectSize, err := strconv.ParseInt(meta.Get(oss.HTTPHeaderContentLength), 10, 64)
1462+
if err != nil {
1463+
return err
1464+
}
1465+
var offset int64
1466+
//var partSize int64
1467+
parts := []downloadPart{}
1468+
partSize, _ := GetInt(OptionPartSize, pc.command.options)
1469+
parallel, _ := GetInt(OptionParallel, pc.command.options)
1470+
if parallel <= 0 {
1471+
parallel = 1
1472+
}
1473+
1474+
if partSize > 0 {
1475+
i := 0
1476+
for offset = 0; offset < objectSize; offset += partSize {
1477+
part := downloadPart{}
1478+
part.Index = i
1479+
part.Start = offset
1480+
part.End = getPartEnd(offset, objectSize, partSize)
1481+
parts = append(parts, part)
1482+
i++
1483+
}
1484+
} else {
1485+
part := downloadPart{}
1486+
part.Index = 0
1487+
part.Start = 0
1488+
part.End = objectSize - 1
1489+
parts = append(parts, part)
1490+
}
1491+
1492+
//}
1493+
jobs := make(chan downloadPart, len(parts))
1494+
results := make(chan downloadPart, len(parts))
1495+
failed := make(chan error)
1496+
die := make(chan bool)
1497+
routines := int(parallel)
1498+
var statBandwidth StatBandWidth
1499+
statBandwidth.Reset(int(parallel))
1500+
1501+
//fmt.Printf("\nDetectDownloadTime, partSize :%v, objectSize:%v, parallel:%v\n", partSize, objectSize, parallel)
1502+
1503+
arg := downloadWorkerArg{bucket, pc.pbOption.objectName}
1504+
for w := 1; w <= routines; w++ {
1505+
go downloadWorker(w, arg, jobs, results, failed, die, &statBandwidth)
1506+
}
1507+
1508+
// Download parts concurrently
1509+
go downloadScheduler(jobs, parts)
1510+
1511+
go func() {
1512+
oldStat := statBandwidth.GetStat()
1513+
for {
1514+
time.Sleep(time.Duration(2) * time.Second)
1515+
nowStat := statBandwidth.GetStat()
1516+
nowTick := time.Now().UnixNano() / 1000 / 1000
1517+
1518+
nowSpeed := float64(nowStat.TotalBytes-oldStat.TotalBytes) / 1024
1519+
averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
1520+
maxSpeed := nowStat.MaxSpeed
1521+
if nowSpeed > maxSpeed {
1522+
maxSpeed = nowSpeed
1523+
statBandwidth.SetMaxSpeed(maxSpeed)
1524+
}
1525+
oldStat = nowStat
1526+
fmt.Printf("\rdownloading average speed:%.2f(KB/s),current speed:%.2f(KB/s),max speed:%.2f(KB/s)", averSpeed, nowSpeed, maxSpeed)
1527+
}
1528+
}()
1529+
1530+
completed := 0
1531+
for completed < len(parts) {
1532+
select {
1533+
case part := <-results:
1534+
completed++
1535+
_ = (part.End - part.Start + 1)
1536+
case err := <-failed:
1537+
close(die)
1538+
return err
1539+
}
1540+
if completed >= len(parts) {
1541+
break
1542+
}
1543+
}
1544+
1545+
nowTick := time.Now().UnixNano() / 1000 / 1000
1546+
nowStat := statBandwidth.GetStat()
1547+
averSpeed := float64(nowStat.TotalBytes/1024) / float64((nowTick-nowStat.StartTick)/1000)
1548+
//total := float64(objectSize)
1549+
1550+
fmt.Printf("\ndownload-speed part-size:%v, parallel:%v total bytes:%v, cost:%.3f s, avg speed:%.2f(kB/s)\n", partSize, parallel, nowStat.TotalBytes, float64(nowTick-nowStat.StartTick)/1000, averSpeed)
1551+
1552+
return nil
1553+
}

0 commit comments

Comments
 (0)