go - Why do these goroutines not scale their performance from more concurrent executions? -
background
i working on bachelor thesis , task optimise given code in go, i.e. make run fast possible. first, optimised serial function , tried introduce parallelism via goroutines. after researching on internet understand difference between concurrency , parallelism following slides talks.golang. visited parallel programming courses parallelised c/c++ code of pthread/openmp, tried apply these paradigms in go. said, in particular case optimising function computes moving average of slice length len:=n+(window_size-1)
(it equals either 9393 or 10175), hence have n
windows of compute corresponding arithmetic average , save in output slice.
note task inherently embarrassing parallel.
my optimisation attempts , results
in moving_avg_concurrent2
split slice num_goroutines
smaller pieces , ran each 1 goroutine. function performed 1 goroutine, out of reason (could not find out why yet, getting tangent here), better moving_avg_serial4
more 1 goroutine started perform worse moving_avg_serial4
.
in moving_avg_concurrent3
adopted master/worker paradigm. performance worse moving_avg_serial4
when using 1 goroutine. here @ least got better performance when increasing num_goroutines
still not better moving_avg_serial4
. compare performances of moving_avg_serial4
, moving_avg_concurrent2
, moving_avg_concurrent3
wrote benchmark , tabulated results:
fct & num_goroutines | timing in ns/op | percentage --------------------------------------------------------------------- serial4 | 4357893 | 100.00% concur2_1 | 5174818 | 118.75% concur2_4 | 9986386 | 229.16% concur2_8 | 18973443 | 435.38% concur2_32 | 75602438 | 1734.84% concur3_1 | 32423150 | 744.01% concur3_4 | 21083897 | 483.81% concur3_8 | 16427430 | 376.96% concur3_32 | 15157314 | 347.81%
question
since mentioned above problem embarrassingly parallel expecting see tremendous performance increase not case.
why moving_avg_concurrent2
not scale @ all?
, why moving_avg_concurrent3
slower moving_avg_serial4
?
know goroutines cheap still not free, possible generates overhead such slower moving_avg_serial4
?
code
functions:
// returns slice containing moving average of input (given, i.e. not optimised) func moving_avg_serial(input []float64, window_size int) []float64 { first_time := true var output = make([]float64, len(input)) if len(input) > 0 { var buffer = make([]float64, window_size) // initialise buffer nan := range buffer { buffer[i] = math.nan() } i, val := range input { old_val := buffer[int((math.mod(float64(i), float64(window_size))))] buffer[int((math.mod(float64(i), float64(window_size))))] = val if !nan_in_slice(buffer) && first_time { sum := 0.0 _, entry := range buffer { sum += entry } output[i] = sum / float64(window_size) first_time = false } else if > 0 && !math.isnan(output[i-1]) && !nan_in_slice(buffer) { output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop } else { output[i] = math.nan() } } } else { // empty input fmt.println("moving_avg panicking!") panic(fmt.sprintf("%v", input)) } return output } // returns slice containing moving average of input // reordering control structures exploid short-circuit evaluation func moving_avg_serial4(input []float64, window_size int) []float64 { first_time := true var output = make([]float64, len(input)) if len(input) > 0 { var buffer = make([]float64, window_size) // initialise buffer nan := range buffer { buffer[i] = math.nan() } := range input { // fmt.printf("in mvg_avg4: i=%v\n", i) old_val := buffer[int((math.mod(float64(i), float64(window_size))))] buffer[int((math.mod(float64(i), float64(window_size))))] = input[i] if first_time && !nan_in_slice(buffer) { sum := 0.0 j := range buffer { sum += buffer[j] } output[i] = sum / float64(window_size) first_time = false } else if > 0 && !math.isnan(output[i-1]) /* && !nan_in_slice(buffer)*/ { output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop } else { output[i] = math.nan() } } } else { // empty input fmt.println("moving_avg panicking!") panic(fmt.sprintf("%v", input)) } return output } // returns slice containing moving average of input // splitting slice smaller pieces goroutines without using serial version, i.e. have nan's in beginning, hope reduce overhead // still not scale (decreasing performance increasing size , num_goroutines) func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 { var output = make([]float64, window_size-1, len(input)) := 0; < window_size-1; i++ { output[i] = math.nan() } if len(input) > 0 { num_items := len(input) - (window_size - 1) var barrier_wg sync.waitgroup n := num_items / num_goroutines go_avg := make([][]float64, num_goroutines) := 0; < num_goroutines; i++ { go_avg[i] = make([]float64, 0, num_goroutines) } := 0; < num_goroutines; i++ { barrier_wg.add(1) go func(go_id int) { defer barrier_wg.done() // computing boundaries var start, stop int start = go_id*int(n) + (window_size - 1) // starting index // ending index if go_id != (num_goroutines - 1) { stop = start + n // ending index } else { stop = num_items + (window_size - 1) // ending index } loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size) loc_avg = make([]float64, stop-start) current_sum := 0.0 := start - (window_size - 1); < start+1; i++ { current_sum += input[i] } loc_avg[0] = current_sum / float64(window_size) idx := 1 := start + 1; < stop; i++ { loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size) idx++ } go_avg[go_id] = append(go_avg[go_id], loc_avg...) }(i) } barrier_wg.wait() := 0; < num_goroutines; i++ { output = append(output, go_avg[i]...) } } else { // empty input fmt.println("moving_avg panicking!") panic(fmt.sprintf("%v", input)) } return output } // returns slice containing moving average of input // change of paradigm, opt master worker pattern , spawn windows each computed goroutine func compute_window_avg(input, output []float64, start, end int) { sum := 0.0 size := end - start _, val := range input[start:end] { sum += val } output[end-1] = sum / float64(size) } func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 { var output = make([]float64, window_size-1, len(input)) := 0; < window_size-1; i++ { output[i] = math.nan() } if len(input) > 0 { num_windows := len(input) - (window_size - 1) var output = make([]float64, len(input)) := 0; < window_size-1; i++ { output[i] = math.nan() } pending := make(chan *work) done := make(chan *work) // creating work go func() { := 0; < num_windows; i++ { pending <- newwork(compute_window_avg, input, output, i, i+window_size) } }() // start goroutines work through pending till there nothing left := 0; < num_goroutines; i++ { go func() { worker(pending, done) }() } // wait till every work done := 0; < num_windows; i++ { <-done } return output } else { // empty input fmt.println("moving_avg panicking!") panic(fmt.sprintf("%v", input)) } return output }
benchmarks:
//############### benchmarks ############### var import_data_res11 []float64 func benchmarkmoving_avg_serial(b *testing.b, window int) { var r []float64 n := 0; n < b.n; n++ { r = moving_avg_serial(backtest_res.f["trading drawdowns"], window) } import_data_res11 = r } var import_data_res14 []float64 func benchmarkmoving_avg_serial4(b *testing.b, window int) { var r []float64 n := 0; n < b.n; n++ { r = moving_avg_serial4(backtest_res.f["trading drawdowns"], window) } import_data_res14 = r } var import_data_res16 []float64 func benchmarkmoving_avg_concurrent2(b *testing.b, window, num_goroutines int) { var r []float64 n := 0; n < b.n; n++ { r = moving_avg_concurrent2(backtest_res.f["trading drawdowns"], window, num_goroutines) } import_data_res16 = r } var import_data_res17 []float64 func benchmarkmoving_avg_concurrent3(b *testing.b, window, num_goroutines int) { var r []float64 n := 0; n < b.n; n++ { r = moving_avg_concurrent3(backtest_res.f["trading drawdowns"], window, num_goroutines) } import_data_res17 = r } func benchmarkmoving_avg_serial_261x10(b *testing.b) { benchmarkmoving_avg_serial(b, 261*10) } func benchmarkmoving_avg_serial4_261x10(b *testing.b) { benchmarkmoving_avg_serial4(b, 261*10) } func benchmarkmoving_avg_concurrent2_261x10_1(b *testing.b) { benchmarkmoving_avg_concurrent2(b, 261*10, 1) } func benchmarkmoving_avg_concurrent2_261x10_8(b *testing.b) { benchmarkmoving_avg_concurrent2(b, 261*10, 8) } func benchmarkmoving_avg_concurrent3_261x10_1(b *testing.b) { benchmarkmoving_avg_concurrent3(b, 261*10, 1) } func benchmarkmoving_avg_concurrent3_261x10_8(b *testing.b) { benchmarkmoving_avg_concurrent3(b, 261*10, 8) } //############### benchmarks end ###############
remarks:
first post, still learning, constructive criticism welcome.
fact #0: pre-mature optimisation efforts have negative yields
showing waste of time & efforts
why?
single "wrong" sloc may devastate performance more +37%
or may improve performance spend less -57% of baseline processing time
51.151µs on ma(200) [10000]float64 ~ 22.017µs on ma(200) [10000]int 70.325µs on ma(200) [10000]float64
why []int
-s?
see on own above - bread , butter hpc/fintech efficient sub-[us] processing strategies ( , still speak in terms of [serial]
process scheduling ).
next comes harder part:
fact #1: task not embarrasingly parallel
yes, 1 may go , implement moving average calculation, indeed proceeds through heaps of data using intentionally indoctrinated "just"-[concurrent]
processing approach ( irrespective whether being due kind of error, authority's "advice", professional blindness or dual-socrates-fair ignorance ) not mean nature of convolutional stream-processing, present inside moving average mathematical formulation, has forgotten pure [serial]
process, due attempt enforce calculated inside degree of "just"-[concurrent]
processing.
( btw. hard computer-scientists , dual-domain nerds object here, go-language design using best rob pike's skills having framework of concurrent coroutines, not true-[parallel]
process-scheduling, though hoare's csp-tools, available in language concept, may add salt , pepper , introduce stop-block type of inter-process communication tools, block "just"-[concurrent]
code sections hardwired csp-p2p-synchronisation. )
fact #2: go distributed ( kind of speedup ) @ end
having poor level of performance in [serial]
not set yardstick. having reasonable amount of performance tuning in single-thread, 1 may benefit going distributed ( still having pay additional serial costs, makes amdahl law ( rather overhead-strict-amdahl law ) game ).
if 1 can introduce such low level of additional setup-overheads and still achieve remarkable parallelism, scaled non-[seq]
part of processing, there , there comes chance increase process effective-performance.
it not hard loose more gain in this, benchmark pure-[seq]
against potential tradeoffs between non-[seq] / n[par]_processes
theoretical, overhead-naive speedup, 1 pay cost of sum of add-on-[seq]
-overheads, if , if :
( pure-[seq]_processing [ns] + add-on-[seq]-setup-overheads [ns] + ( non-[seq]_processing [ns] / n[par]_processes ) ) << ( pure-[seq]_processing [ns] + ( non-[seq]_processing [ns] / 1 ) )
not having jet-fighters advantage of both surplus height , sun behind you, never attempt going kind of hpc / parallelisation attempts - never pay not being remarkably <<
better, smart [seq]
-process.
epilogue: on overhead-strict amdahl's law interactive experiment ui
one animation worth million words.
an interactive animation better:
so,
assume process-under-test, has both [serial]
, [parallel]
part of process schedule.
let p
[parallel]
fraction of process duration ~ ( 0.0 .. 1.0 )
[serial]
part not last longer ( 1 - p )
, right?
so, let's start interactive experimentation such test-case, p == 1.0
, meaning such process duration spent in [parallel]
part, , both initial serial , terminating parts of process-flow ( principally [serial]
) have zero-durations ( ( 1 - p ) == 0. )
assume system no particular magic , needs spend real steps on intialisation of each of [parallel]
part, run on different processor ( (1), 2, .., n )
, let's add overheads, if asked re-organise process flow , marshal + distribute + un-marshal necessary instructions , data, intended process can start , run on n
processors in parallel.
these costs called o
( here assumed simplicity constant , invariant n
, not case in real, on silicon / on numa / on distributed infrastructures ).
by clicking on epilogue headline above, interactive environment opens , free one's own experimentation.
with p == 1. && o == 0. && n > 1
performance steeply growing current achievable [parallel]
-hardware o/s limits still monolytical o/s code-execution ( still no additional distribution costs mpi- , similar depeche-mode distributions of work-units ( 1 have add indeed big number of [ms]
, while our far best [serial]
implementation has did whole job in less ~ 22.1 [us] ) ).
but except such artificially optimistic case, job not cheap efficiently parallelised.
try having not zero, ~ 0.01% setup overhead costs of
o
, , line starts show different nature of overhead-aware scaling utmost extreme[parallel]
case ( having stillp == 1.0
), , having potential speedup somewhere near half of super-idealistic linear speedup case.now, turn
p
closer reality, somewhere less artificially set initial super-idealistic case of== 1.00
--> { 0.99, 0.98, 0.95 }
, ... bingo, reality, process-scheduling ought tested , pre-validated.
what mean?
as example, if overhead ( of launching + final joining pool of coroutines ) take more ~ 0.1%
of actual [parallel]
processing section duration, there not bigger speedup of 4x ( 1/4 of original duration in time ) 5 coroutines ( having p ~ 0.95 ), not more 10x ( 10-times faster duration ) 20 coroutines ( assuming system has 5-cpu-cores, resp. 20-cpu-cores free & available , ready ( best o/s-level cpu-core-affinity mapped processes / threads ) uninterrupted serving coroutines during whole life-span, achieve above expected speedups.
not having such amount of hardware resources free , ready of task-units, intended implementing [parallel]
-part of process-schedule, blocking/waiting states introduce additional absolute wait-states , resulting-performance adds these new-[serial]
-blocking/waiting sections overall process-duration , wished-to-have speedups cease exist , performance factor falls under << 1.00
( meaning effective run-time due blocking-states way slower, non-parallelised just-[serial]
workflow ).
this may sound complicated new keen experimentators, may put in reversed perspective. given whole process of distribution intended [parallel]
pool-of-tasks known not shorter than, say, 10 [us]
, overhead-strict graphs show, there needs @ least 1000 x 10 [us]
of non-blocking computing intensive processing inside [parallel]
section not devastate efficiency of parallelised-processing.
if there not sufficiently "fat"-piece of processing, overhead-costs ( going remarkably above above cited threshold of ~ 0.1%
) brutally devastate net-efficiency of parallellised-processing ( having performed @ such unjustifiably high relative costs of setup vs limited net effects of any-n
-processors, demonstrated in available live-graphs ).
there no surprise distributed-computing nerds, overhead o
comes additional dependencies - on n
( more processes, more efforts spent distribute work-packages ), on marshalled data-blobs' sizes ( larger blobs, longer mem-/io-devices remain blocked, before serving next process receive distributed blob across such device/resource each of target 2..n
-th receiving process ), on avoided / csp-signalled, channel-mediated inter-process coordinations ( call additional per-incident blocking, reducing p
further , further below nice ideal of 1.
).
so, real-world reality rather far idealised, nice , promising p
== 1.0
, ( 1 -
p
) == 0.0
, o
== 0.0
as obvious beginning, try beat rather 22.1 [us]
[serial]
threshold, trying beat this, while getting worse , worse, if going [parallel]
realistic overheads , scaling, using under-performing approaches, not single bit.
Comments
Post a Comment