go - Why do these goroutines not scale their performance from more concurrent executions? -


background

i working on bachelor thesis , task optimise given code in go, i.e. make run fast possible. first, optimised serial function , tried introduce parallelism via goroutines. after researching on internet understand difference between concurrency , parallelism following slides talks.golang. visited parallel programming courses parallelised c/c++ code of pthread/openmp, tried apply these paradigms in go. said, in particular case optimising function computes moving average of slice length len:=n+(window_size-1) (it equals either 9393 or 10175), hence have n windows of compute corresponding arithmetic average , save in output slice.

note task inherently embarrassing parallel.

my optimisation attempts , results

in moving_avg_concurrent2 split slice num_goroutines smaller pieces , ran each 1 goroutine. function performed 1 goroutine, out of reason (could not find out why yet, getting tangent here), better moving_avg_serial4 more 1 goroutine started perform worse moving_avg_serial4.
in moving_avg_concurrent3 adopted master/worker paradigm. performance worse moving_avg_serial4 when using 1 goroutine. here @ least got better performance when increasing num_goroutines still not better moving_avg_serial4. compare performances of moving_avg_serial4, moving_avg_concurrent2 , moving_avg_concurrent3 wrote benchmark , tabulated results:

fct & num_goroutines | timing in ns/op | percentage   ---------------------------------------------------------------------              serial4    |         4357893 |   100.00%             concur2_1  |         5174818 |   118.75%             concur2_4  |         9986386 |   229.16%             concur2_8  |        18973443 |   435.38%             concur2_32 |        75602438 |  1734.84%             concur3_1  |        32423150 |   744.01%             concur3_4  |        21083897 |   483.81%             concur3_8  |        16427430 |   376.96%             concur3_32 |        15157314 |   347.81%   

question

since mentioned above problem embarrassingly parallel expecting see tremendous performance increase not case.

why moving_avg_concurrent2 not scale @ all?
, why moving_avg_concurrent3 slower moving_avg_serial4?
know goroutines cheap still not free, possible generates overhead such slower moving_avg_serial4?

code

functions:

// returns slice containing moving average of input (given, i.e. not optimised) func moving_avg_serial(input []float64, window_size int) []float64 {     first_time := true     var output = make([]float64, len(input))     if len(input) > 0 {         var buffer = make([]float64, window_size)         // initialise buffer nan         := range buffer {             buffer[i] = math.nan()         }         i, val := range input {             old_val := buffer[int((math.mod(float64(i), float64(window_size))))]             buffer[int((math.mod(float64(i), float64(window_size))))] = val             if !nan_in_slice(buffer) && first_time {                 sum := 0.0                 _, entry := range buffer {                     sum += entry                 }                 output[i] = sum / float64(window_size)                 first_time = false             } else if > 0 && !math.isnan(output[i-1]) && !nan_in_slice(buffer) {                 output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop             } else {                 output[i] = math.nan()             }         }     } else { // empty input         fmt.println("moving_avg panicking!")         panic(fmt.sprintf("%v", input))     }     return output }  // returns slice containing moving average of input // reordering control structures exploid short-circuit evaluation func moving_avg_serial4(input []float64, window_size int) []float64 {     first_time := true     var output = make([]float64, len(input))     if len(input) > 0 {         var buffer = make([]float64, window_size)         // initialise buffer nan         := range buffer {             buffer[i] = math.nan()         }         := range input {             //            fmt.printf("in mvg_avg4: i=%v\n", i)             old_val := buffer[int((math.mod(float64(i), float64(window_size))))]             buffer[int((math.mod(float64(i), float64(window_size))))] = input[i]             if first_time && !nan_in_slice(buffer) {                 sum := 0.0                 j := range buffer {                     sum += buffer[j]                 }                 output[i] = sum / float64(window_size)                 first_time = false             } else if > 0 && !math.isnan(output[i-1]) /* && !nan_in_slice(buffer)*/ {                 output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop             } else {                 output[i] = math.nan()             }         }     } else { // empty input         fmt.println("moving_avg panicking!")         panic(fmt.sprintf("%v", input))     }     return output }  // returns slice containing moving average of input // splitting slice smaller pieces goroutines without using serial version, i.e. have nan's in beginning, hope reduce overhead // still not scale (decreasing performance increasing size , num_goroutines) func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {     var output = make([]float64, window_size-1, len(input))     := 0; < window_size-1; i++ {         output[i] = math.nan()     }     if len(input) > 0 {         num_items := len(input) - (window_size - 1)         var barrier_wg sync.waitgroup         n := num_items / num_goroutines         go_avg := make([][]float64, num_goroutines)         := 0; < num_goroutines; i++ {             go_avg[i] = make([]float64, 0, num_goroutines)         }          := 0; < num_goroutines; i++ {             barrier_wg.add(1)             go func(go_id int) {                 defer barrier_wg.done()                  // computing boundaries                 var start, stop int                 start = go_id*int(n) + (window_size - 1) // starting index                 // ending index                 if go_id != (num_goroutines - 1) {                     stop = start + n // ending index                 } else {                     stop = num_items + (window_size - 1) // ending index                 }                  loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)                  loc_avg = make([]float64, stop-start)                 current_sum := 0.0                 := start - (window_size - 1); < start+1; i++ {                     current_sum += input[i]                 }                 loc_avg[0] = current_sum / float64(window_size)                 idx := 1                  := start + 1; < stop; i++ {                     loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)                     idx++                 }                  go_avg[go_id] = append(go_avg[go_id], loc_avg...)              }(i)         }         barrier_wg.wait()          := 0; < num_goroutines; i++ {             output = append(output, go_avg[i]...)         }      } else { // empty input         fmt.println("moving_avg panicking!")         panic(fmt.sprintf("%v", input))     }     return output }  // returns slice containing moving average of input // change of paradigm, opt master worker pattern , spawn windows each computed goroutine func compute_window_avg(input, output []float64, start, end int) {     sum := 0.0     size := end - start     _, val := range input[start:end] {         sum += val     }     output[end-1] = sum / float64(size) }  func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {     var output = make([]float64, window_size-1, len(input))     := 0; < window_size-1; i++ {         output[i] = math.nan()     }     if len(input) > 0 {         num_windows := len(input) - (window_size - 1)         var output = make([]float64, len(input))         := 0; < window_size-1; i++ {             output[i] = math.nan()         }          pending := make(chan *work)         done := make(chan *work)          // creating work         go func() {             := 0; < num_windows; i++ {                 pending <- newwork(compute_window_avg, input, output, i, i+window_size)             }         }()          // start goroutines work through pending till there nothing left         := 0; < num_goroutines; i++ {             go func() {                 worker(pending, done)             }()         }          // wait till every work done         := 0; < num_windows; i++ {             <-done         }          return output      } else { // empty input         fmt.println("moving_avg panicking!")         panic(fmt.sprintf("%v", input))     }     return output } 

benchmarks:

//############### benchmarks ############### var import_data_res11 []float64 func benchmarkmoving_avg_serial(b *testing.b, window int) {     var r []float64     n := 0; n < b.n; n++ {         r = moving_avg_serial(backtest_res.f["trading drawdowns"], window)     }     import_data_res11 = r }  var import_data_res14 []float64 func benchmarkmoving_avg_serial4(b *testing.b, window int) {     var r []float64     n := 0; n < b.n; n++ {         r = moving_avg_serial4(backtest_res.f["trading drawdowns"], window)     }     import_data_res14 = r }  var import_data_res16 []float64 func benchmarkmoving_avg_concurrent2(b *testing.b, window, num_goroutines int) {     var r []float64     n := 0; n < b.n; n++ {         r = moving_avg_concurrent2(backtest_res.f["trading drawdowns"], window, num_goroutines)     }     import_data_res16 = r }  var import_data_res17 []float64 func benchmarkmoving_avg_concurrent3(b *testing.b, window, num_goroutines int) {     var r []float64     n := 0; n < b.n; n++ {         r = moving_avg_concurrent3(backtest_res.f["trading drawdowns"], window, num_goroutines)     }     import_data_res17 = r }    func benchmarkmoving_avg_serial_261x10(b *testing.b) {     benchmarkmoving_avg_serial(b, 261*10) }  func benchmarkmoving_avg_serial4_261x10(b *testing.b) {     benchmarkmoving_avg_serial4(b, 261*10) }   func benchmarkmoving_avg_concurrent2_261x10_1(b *testing.b) {     benchmarkmoving_avg_concurrent2(b, 261*10, 1) } func benchmarkmoving_avg_concurrent2_261x10_8(b *testing.b) {     benchmarkmoving_avg_concurrent2(b, 261*10, 8) }   func benchmarkmoving_avg_concurrent3_261x10_1(b *testing.b) {     benchmarkmoving_avg_concurrent3(b, 261*10, 1) } func benchmarkmoving_avg_concurrent3_261x10_8(b *testing.b) {     benchmarkmoving_avg_concurrent3(b, 261*10, 8) } //############### benchmarks end ############### 

remarks:
first post, still learning, constructive criticism welcome.

fact #0: pre-mature optimisation efforts have negative yields
showing waste of time & efforts


why?
single "wrong" sloc may devastate performance more +37%
or may improve performance spend less -57% of baseline processing time

51.151µs on ma(200) [10000]float64    ~ 22.017µs on ma(200) [10000]int 70.325µs on ma(200) [10000]float64 

why []int-s?
see on own above - bread , butter hpc/fintech efficient sub-[us] processing strategies ( , still speak in terms of [serial] process scheduling ).

this 1 may test on scale - rather test first ( here ) own implementations, on same scale - ma(200) [10000]float64 setup - , post baseline durations in [us] view initial process performance , to compare apples-to-apples, having posted 51.2 [us] threshold compare against.

next comes harder part:


fact #1: task not embarrasingly parallel

yes, 1 may go , implement moving average calculation, indeed proceeds through heaps of data using intentionally indoctrinated "just"-[concurrent] processing approach ( irrespective whether being due kind of error, authority's "advice", professional blindness or dual-socrates-fair ignorance ) not mean nature of convolutional stream-processing, present inside moving average mathematical formulation, has forgotten pure [serial] process, due attempt enforce calculated inside degree of "just"-[concurrent] processing.

( btw. hard computer-scientists , dual-domain nerds object here, go-language design using best rob pike's skills having framework of concurrent coroutines, not true-[parallel] process-scheduling, though hoare's csp-tools, available in language concept, may add salt , pepper , introduce stop-block type of inter-process communication tools, block "just"-[concurrent] code sections hardwired csp-p2p-synchronisation. )


fact #2: go distributed ( kind of speedup ) @ end

having poor level of performance in [serial] not set yardstick. having reasonable amount of performance tuning in single-thread, 1 may benefit going distributed ( still having pay additional serial costs, makes amdahl law ( rather overhead-strict-amdahl law ) game ).

if 1 can introduce such low level of additional setup-overheads and still achieve remarkable parallelism, scaled non-[seq] part of processing, there , there comes chance increase process effective-performance.

it not hard loose more gain in this, benchmark pure-[seq] against potential tradeoffs between non-[seq] / n[par]_processes theoretical, overhead-naive speedup, 1 pay cost of sum of add-on-[seq]-overheads, if , if :

(         pure-[seq]_processing      [ns] +       add-on-[seq]-setup-overheads [ns] +        ( non-[seq]_processing      [ns] / n[par]_processes )   ) << (  pure-[seq]_processing      [ns]        + ( non-[seq]_processing      [ns] / 1 )          ) 

not having jet-fighters advantage of both surplus height , sun behind you, never attempt going kind of hpc / parallelisation attempts - never pay not being remarkably << better, smart [seq]-process.


enter image description here

epilogue: on overhead-strict amdahl's law interactive experiment ui

one animation worth million words.

an interactive animation better:

so,
assume process-under-test, has both [serial] , [parallel] part of process schedule.

let p [parallel] fraction of process duration ~ ( 0.0 .. 1.0 ) [serial] part not last longer ( 1 - p ), right?

so, let's start interactive experimentation such test-case, p == 1.0, meaning such process duration spent in [parallel] part, , both initial serial , terminating parts of process-flow ( principally [serial] ) have zero-durations ( ( 1 - p ) == 0. )

assume system no particular magic , needs spend real steps on intialisation of each of [parallel] part, run on different processor ( (1), 2, .., n ), let's add overheads, if asked re-organise process flow , marshal + distribute + un-marshal necessary instructions , data, intended process can start , run on n processors in parallel.

these costs called o ( here assumed simplicity constant , invariant n, not case in real, on silicon / on numa / on distributed infrastructures ).

by clicking on epilogue headline above, interactive environment opens , free one's own experimentation.

with p == 1. && o == 0. && n > 1 performance steeply growing current achievable [parallel]-hardware o/s limits still monolytical o/s code-execution ( still no additional distribution costs mpi- , similar depeche-mode distributions of work-units ( 1 have add indeed big number of [ms], while our far best [serial] implementation has did whole job in less ~ 22.1 [us] ) ).

but except such artificially optimistic case, job not cheap efficiently parallelised.

  • try having not zero, ~ 0.01% setup overhead costs of o, , line starts show different nature of overhead-aware scaling utmost extreme [parallel] case ( having still p == 1.0 ), , having potential speedup somewhere near half of super-idealistic linear speedup case.

  • now, turn p closer reality, somewhere less artificially set initial super-idealistic case of == 1.00 --> { 0.99, 0.98, 0.95 } , ... bingo, reality, process-scheduling ought tested , pre-validated.

what mean?

as example, if overhead ( of launching + final joining pool of coroutines ) take more ~ 0.1% of actual [parallel] processing section duration, there not bigger speedup of 4x ( 1/4 of original duration in time ) 5 coroutines ( having p ~ 0.95 ), not more 10x ( 10-times faster duration ) 20 coroutines ( assuming system has 5-cpu-cores, resp. 20-cpu-cores free & available , ready ( best o/s-level cpu-core-affinity mapped processes / threads ) uninterrupted serving coroutines during whole life-span, achieve above expected speedups.

not having such amount of hardware resources free , ready of task-units, intended implementing [parallel]-part of process-schedule, blocking/waiting states introduce additional absolute wait-states , resulting-performance adds these new-[serial]-blocking/waiting sections overall process-duration , wished-to-have speedups cease exist , performance factor falls under << 1.00 ( meaning effective run-time due blocking-states way slower, non-parallelised just-[serial] workflow ).

this may sound complicated new keen experimentators, may put in reversed perspective. given whole process of distribution intended [parallel] pool-of-tasks known not shorter than, say, 10 [us], overhead-strict graphs show, there needs @ least 1000 x 10 [us] of non-blocking computing intensive processing inside [parallel] section not devastate efficiency of parallelised-processing.

if there not sufficiently "fat"-piece of processing, overhead-costs ( going remarkably above above cited threshold of ~ 0.1% ) brutally devastate net-efficiency of parallellised-processing ( having performed @ such unjustifiably high relative costs of setup vs limited net effects of any-n-processors, demonstrated in available live-graphs ).

there no surprise distributed-computing nerds, overhead o comes additional dependencies - on n ( more processes, more efforts spent distribute work-packages ), on marshalled data-blobs' sizes ( larger blobs, longer mem-/io-devices remain blocked, before serving next process receive distributed blob across such device/resource each of target 2..n-th receiving process ), on avoided / csp-signalled, channel-mediated inter-process coordinations ( call additional per-incident blocking, reducing p further , further below nice ideal of 1. ).

so, real-world reality rather far idealised, nice , promising p == 1.0, ( 1 - p ) == 0.0 , o == 0.0

as obvious beginning, try beat rather 22.1 [us] [serial] threshold, trying beat this, while getting worse , worse, if going [parallel] realistic overheads , scaling, using under-performing approaches, not single bit.


Comments

Popular posts from this blog

html - Firefox flex bug applied to buttons? -

html - Missing border-right in select on Firefox -

python - build a suggestions list using fuzzywuzzy -