Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions benchmark_runner/benchmark_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
"golang.org/x/time/rate"
"io/ioutil"
"log"
"math"
Expand All @@ -30,6 +31,7 @@ const (
WorkerPerQueue = 0
// SingleQueue is the value for using a single shared queue across all workers
SingleQueue = 1
Inf = rate.Limit(math.MaxFloat64)
)

// BenchmarkRunner is responsible for initializing and storing common
Expand Down Expand Up @@ -287,10 +289,19 @@ func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint) {

channels := l.createChannels(workQueues)
// Launch all worker processes in background

var requestRate = Inf
var requestBurst = 1
if l.maxRPS != 0 {
requestRate = rate.Limit(l.maxRPS)
requestBurst = int(l.workers) //int(b.workers)
}
var rateLimiter = rate.NewLimiter(requestRate, requestBurst)

var wg sync.WaitGroup
for i := 0; i < int(l.workers); i++ {
wg.Add(1)
go l.work(b, &wg, channels[i%len(channels)], i)
go l.work(b, &wg, channels[i%len(channels)], i, rateLimiter, l.maxRPS != 0)
}

w := new(tabwriter.Writer)
Expand Down Expand Up @@ -318,6 +329,7 @@ func (l *BenchmarkRunner) RunBenchmark(b Benchmark, workQueues uint) {
l.testResult.OverallQuantiles = l.GetOverallQuantiles()
l.testResult.Limit = l.limit
l.testResult.Workers = l.workers
l.testResult.MaxRps = l.maxRPS
l.summary()
}

Expand Down Expand Up @@ -380,7 +392,7 @@ func (l *BenchmarkRunner) scan(b Benchmark, channels []*duplexChannel, start tim
}

// work is the processing function for each worker in the loader
func (l *BenchmarkRunner) work(b Benchmark, wg *sync.WaitGroup, c *duplexChannel, workerNum int) {
func (l *BenchmarkRunner) work(b Benchmark, wg *sync.WaitGroup, c *duplexChannel, workerNum int, rateLimiter *rate.Limiter, useRateLimiter bool) {

// Prepare processor
proc := b.GetProcessor()
Expand All @@ -389,7 +401,7 @@ func (l *BenchmarkRunner) work(b Benchmark, wg *sync.WaitGroup, c *duplexChannel
// Process batches coming from duplexChannel.toWorker queue
// and send ACKs into duplexChannel.toScanner queue
for b := range c.toWorker {
stats := proc.ProcessBatch(b, l.doLoad)
stats := proc.ProcessBatch(b, l.doLoad, rateLimiter, useRateLimiter)
cmdStats := stats.CmdStats()
for pos := 0; pos < len(cmdStats); pos++ {
cmdStat := cmdStats[pos]
Expand Down
1 change: 1 addition & 0 deletions benchmark_runner/indexing_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type TestResult struct {
ResultFormatVersion string `json:"ResultFormatVersion"`
Limit uint64 `json:"Limit"`
Workers uint `json:"Workers"`
MaxRps uint64 `json:"MaxRps"`

// DB Spefic Configs
DBSpecificConfigs map[string]interface{} `json:"DBSpecificConfigs"`
Expand Down
4 changes: 3 additions & 1 deletion benchmark_runner/processor.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package benchmark_runner

import "golang.org/x/time/rate"

// Processor is a type that processes the work for a loading worker
type Processor interface {
// Init does per-worker setup needed before receiving databuild
Init(workerNum int, doLoad bool, totalWorkers int)
// ProcessBatch handles a single batch of databuild
ProcessBatch(b Batch, doLoad bool) Stat
ProcessBatch(b Batch, doLoad bool, rateLimiter *rate.Limiter, useRateLimiter bool) Stat
}

// ProcessorCloser is a Processor that also needs to close or cleanup afterwards
Expand Down
15 changes: 10 additions & 5 deletions cmd/ftsb_redisearch/cmd_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/RediSearch/ftsb/benchmark_runner"
"github.com/mediocregopher/radix/v3"
"golang.org/x/time/rate"
"log"
"strconv"
"strings"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (p *processor) Init(workerNumber int, _ bool, totalWorkers int) {
}
}

func connectionProcessor(p *processor) {
func connectionProcessor(p *processor, rateLimiter *rate.Limiter, useRateLimiter bool) {
cmdSlots := make([][]radix.CmdAction, 0, 0)
timesSlots := make([][]time.Time, 0, 0)
clusterSlots := make([][2]uint16, 0, 0)
Expand Down Expand Up @@ -76,7 +77,11 @@ func connectionProcessor(p *processor) {
}
}
if debug > 2 {
fmt.Println(keyPos, key, clusterSlot, cmd, strings.Join(docFields,","), slotP, clusterSlots)
fmt.Println(keyPos, key, clusterSlot, cmd, strings.Join(docFields, ","), slotP, clusterSlots)
}
if useRateLimiter {
r := rateLimiter.ReserveN(time.Now(), int(1))
time.Sleep(r.Delay())
}
if !clusterMode {
cmdSlots[slotP], timesSlots[slotP] = sendFlatCmd(p, p.vanillaClient, cmdType, cmdQueryId, cmd, docFields, bytelen, 1, cmdSlots[slotP], timesSlots[slotP])
Expand Down Expand Up @@ -150,7 +155,7 @@ func sendIfRequired(p *processor, client radix.Client, cmdType string, cmdQueryI
}

// ProcessBatch reads eventsBatches which contain rows of databuild for FT.ADD redis command string
func (p *processor) ProcessBatch(b benchmark_runner.Batch, doLoad bool) (outstat benchmark_runner.Stat) {
func (p *processor) ProcessBatch(b benchmark_runner.Batch, doLoad bool, rateLimiter *rate.Limiter, useRateLimiter bool) (outstat benchmark_runner.Stat) {
outstat = *benchmark_runner.NewStat()
events := b.(*eventsBatch)
rowCnt := uint64(len(events.rows))
Expand All @@ -161,7 +166,7 @@ func (p *processor) ProcessBatch(b benchmark_runner.Batch, doLoad bool) (outstat
p.wg = &sync.WaitGroup{}
p.rows = make(chan string, buflen)
p.wg.Add(1)
go connectionProcessor(p)
go connectionProcessor(p, rateLimiter, useRateLimiter)
for _, row := range events.rows {
p.rows <- row
}
Expand Down Expand Up @@ -194,7 +199,7 @@ func preProcessCmd(row string) (cmdType string, cmdQueryId string, keyPos int, c
cmdType = argsStr[0]
cmdQueryId = argsStr[1]
keyPos, _ = strconv.Atoi(argsStr[2])
keyPos = keyPos+3
keyPos = keyPos + 3
cmd = argsStr[3]
if len(argsStr) > 4 {
args = argsStr[4:]
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ require (
github.com/mediocregopher/radix/v3 v3.5.2
github.com/mitchellh/gox v1.0.1 // indirect
github.com/tcnksm/ghr v0.13.0 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def generate_benchmark_commands():
help='the total ratio of updates ( FT.ADD with REPLACE ). The Aggregate ratio will be given by (1 - update-ratio)')
parser.add_argument('--seed', type=int, default=12345,
help='the random seed used to generate random deterministic outputs')
parser.add_argument('--doc-limit', type=int, default=10000000,
parser.add_argument('--doc-limit', type=int, default=1000000,
help='the total documents to generate to be added in the setup stage')
parser.add_argument('--total-benchmark-commands', type=int, default=1000000,
help='the total commands to generate to be issued in the benchmark stage')
Expand All @@ -322,7 +322,7 @@ def generate_benchmark_commands():
help='the maximum number of random @nodeId:\{...\}\'s to be queried per aggregate command')
parser.add_argument('--index-name', type=str, default="inventory",
help='the name of the RediSearch index to be used')
parser.add_argument('--test-name', type=str, default="10M-ecommerce-inventory", help='the name of the test')
parser.add_argument('--test-name', type=str, default="1M-ecommerce-inventory", help='the name of the test')
parser.add_argument('--test-description', type=str,
default="benchmark focused on updates and aggregate performance",
help='the full description of the test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,17 @@ def generate_ft_search_row(index,query_name,query):
print("\n")
print("-- generating the setup commands -- \n")
progress = tqdm(unit="docs", total=args.doc_limit)
total_docs
for doc in docs:
doc_limit = args.doc_limit
if doc_limit == 0:
doc_limit = len(docs)
while total_docs < doc_limit:
total_docs=total_docs+1
random_doc_pos = random.randint(0, len(docs) - 1)
doc = docs[random_doc_pos]
cmd = use_case_to_cmd(use_ftadd,doc["title"],doc["url"],doc["abstract"],total_docs)
progress.update()
setup_csv_writer.writerow(cmd)
all_csv_writer.writerow(cmd)
if args.doc_limit > 0 and total_docs >= args.doc_limit:
break

progress.close()
all_csvfile.close()
Expand Down
16 changes: 8 additions & 8 deletions scripts/datagen_redisearch/nyc_taxis/ftsb_generate_nyc_taxis.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ def use_case_csv_row_to_cmd(row, index_types, use_ftadd, total_amount_pos, impro
tolls_amount_pos, dropoff_longitude_pos, dropoff_latitude_pos, passenger_count_pos,
fare_amount_pos, extra_pos, trip_distance_pos, tip_amount_pos, store_and_fwd_flag_pos,
payment_type_pos, mta_tax_pos, vendor_id_pos):
pickup_location_long = None if pickup_longitude_pos is None else str_to_float_or_zero(row[pickup_longitude_pos])
pickup_location_lat = None if pickup_latitude_pos is None else str_to_float_or_zero(row[pickup_latitude_pos])
if pickup_location_lat < -90.0 or pickup_location_lat > 90:
pickup_location_long = None if pickup_longitude_pos is None or pickup_longitude_pos > (len(row)-1) else str_to_float_or_zero(row[pickup_longitude_pos])
pickup_location_lat = None if pickup_latitude_pos is None or pickup_latitude_pos > (len(row)-1) else str_to_float_or_zero(row[pickup_latitude_pos])
if pickup_location_lat is not None and (pickup_location_lat < -90.0 or pickup_location_lat > 90):
pickup_location_lat = 0
if pickup_location_long < -180.0 or pickup_location_long > 180:
if pickup_location_long is not None and (pickup_location_long < -180.0 or pickup_location_long > 180):
pickup_location_long = 0

dropoff_location_long = None if dropoff_longitude_pos is None else str_to_float_or_zero(row[dropoff_longitude_pos])
dropoff_location_lat = None if dropoff_latitude_pos is None else str_to_float_or_zero(row[dropoff_latitude_pos])
if dropoff_location_lat < -90.0 or dropoff_location_lat > 90:
dropoff_location_long = None if dropoff_longitude_pos is None or dropoff_longitude_pos > (len(row)-1) else str_to_float_or_zero(row[dropoff_longitude_pos])
dropoff_location_lat = None if dropoff_latitude_pos is None or dropoff_latitude_pos > (len(row)-1) else str_to_float_or_zero(row[dropoff_latitude_pos])
if dropoff_location_lat is not None and ( dropoff_location_lat < -90.0 or dropoff_location_lat > 90 ):
dropoff_location_lat = 0
if dropoff_location_long < -180.0 or dropoff_location_long > 180:
if dropoff_location_long is not None and ( dropoff_location_long < -180.0 or dropoff_location_long > 180 ):
dropoff_location_long = 0

hash = {
Expand Down