1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use super::tables::LineItem;
use crate::error::{ErrorKind, Result};
use crate::types::ArgSelectionVariant;
use numa_gpu::runtime::cpu_affinity::CpuAffinity;
use numa_gpu::utils::CachePadded;
use std::sync::Arc;
use std::time::{Duration, Instant};
extern "C" {
fn tpch_q6_branching(
length: u64,
l_shipdate: *const i32,
l_discount: *const i32,
l_quantity: *const i32,
l_extendedprice: *const i32,
revenue: *mut i64,
);
fn tpch_q6_predication(
length: u64,
l_shipdate: *const i32,
l_discount: *const i32,
l_quantity: *const i32,
l_extendedprice: *const i32,
revenue: *mut i64,
);
}
pub struct Query6Cpu {
threads: usize,
cpu_affinity: CpuAffinity,
selection_variant: ArgSelectionVariant,
}
impl Query6Cpu {
pub fn new(
threads: usize,
cpu_affinity: &CpuAffinity,
selection_variant: ArgSelectionVariant,
) -> Self {
Self {
threads,
cpu_affinity: cpu_affinity.clone(),
selection_variant,
}
}
pub fn run(&self, lineitem: &LineItem) -> Result<(i64, Duration)> {
let mut thread_revenue = vec![CachePadded { value: 0_i64 }; self.threads];
let boxed_cpu_affinity = Arc::new(self.cpu_affinity.clone());
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.threads)
.start_handler(move |tid| {
boxed_cpu_affinity
.clone()
.set_affinity(tid as u16)
.expect("Couldn't set CPU core affinity")
})
.build()
.map_err(|_| ErrorKind::RuntimeError("Failed to create thread pool".to_string()))?;
let chunk_len = (lineitem.len() + self.threads - 1) / self.threads;
let l_shipdate_chunks: Vec<_> = lineitem.shipdate.as_slice().chunks(chunk_len).collect();
let l_discount_chunks: Vec<_> = lineitem.discount.as_slice().chunks(chunk_len).collect();
let l_quantity_chunks: Vec<_> = lineitem.quantity.as_slice().chunks(chunk_len).collect();
let l_extendedprice_chunks: Vec<_> = lineitem
.extendedprice
.as_slice()
.chunks(chunk_len)
.collect();
let q6_f = match self.selection_variant {
ArgSelectionVariant::Branching => tpch_q6_branching,
ArgSelectionVariant::Predication => tpch_q6_predication,
};
let timer = Instant::now();
thread_pool.scope(|s| {
for (((((_tid, l_shipdate), l_discount), l_quantity), l_extendedprice), revenue) in (0
..self.threads)
.zip(l_shipdate_chunks)
.zip(l_discount_chunks)
.zip(l_quantity_chunks)
.zip(l_extendedprice_chunks)
.zip(thread_revenue.iter_mut())
{
s.spawn(move |_| {
unsafe {
q6_f(
l_shipdate.len() as u64,
l_shipdate.as_ptr(),
l_discount.as_ptr(),
l_quantity.as_ptr(),
l_extendedprice.as_ptr(),
&mut revenue.value,
)
};
});
}
});
let time = timer.elapsed();
let revenue = thread_revenue.iter().map(|padded| padded.value).sum();
Ok((revenue, time))
}
}