00:06 Today we're going to parse a list of GPX files and see how we can
make this process faster. Without going into how the parsing actually works,
we'll first parse all the files, one by one, and see how long it takes.
00:33 We have a bunch of URLs pointing to GPX files in a local directory.
We parse each file to collect its number of waypoints. In the process, we can
force-unwrap the Parser, which would only be nil if the given URL were
invalid:
let pointCounts = urls.map { url in
Parser(url: url)!.points.count
}
01:25 As a test for any upcoming refactoring, we add up the number of
points from all GPX files. This way, we'll be able to verify that we're not
breaking the parsing later on:
print(pointCounts.reduce(0, +))
02:05 We've prepared a time function to measure how long the parsing
takes. It prints the measured time to the console:
time {
let pointCounts = urls.map { url in
Parser(url: url)!.points.count
}
print(pointCounts.reduce(0, +))
}
Process in Parallel
02:30 With the map function, we're reading the GPX files one after
another. However, reading one file doesn't depend on reading another, so we
could easily process multiple files in parallel, which should improve the speed
of parsing the entire collection.
02:55 But we can't process the URLs in parallel as long as we use map,
so we have to write the code differently. Instead, we'll work with an empty
result array of integers and — in a concurrent way — loop over the URLs,
writing each result into the result array:
var result: [Int] = []
03:47 DispatchQueue.concurrentPerform — formerly known as
dispatch_apply — lets us specify an amount of iterations and a block that's
called with each iteration. We set the amount of iterations to be the length of
the URL array, which means that the current iteration number, passed into the
block, is an index into the array:
DispatchQueue.concurrentPerform(iterations: urls.count) { idx in
}
04:28 Now we can assign the result of a single iteration into the
result array. This means we have to preallocate the result array so that it
has the same length as the URL array:
var result = Array<Int?>(repeating: nil, count: urls.count)
DispatchQueue.concurrentPerform(iterations: urls.count) { idx in
let url = urls[idx]
let pointCount = Parser(url: url)!.points.count
result[idx] = pointCount
}
06:14 The result array contains optional integers, and after the
parsing, all its elements are set to a value. So, before we can reduce the
array to add up the point counts, we have to force-unwrap each optional count:
print(result.map { $0! }.reduce(0, +))
06:39 This new version of the parsing takes 0.359 seconds. That's much
faster, but the code isn't yet correct. It could crash, because concurrently
modifying a variable isn't allowed. So, like we've done in the past, we can use
a dispatch queue to safeguard the access to a variable:
var result = Array<Int?>(repeating: nil, count: urls.count)
let q = DispatchQueue(label: "sync queue")
DispatchQueue.concurrentPerform(iterations: urls.count) { idx in
q.sync { result[idx] = pointCount }
}
08:13 This way, we make sure that only one task at a time can access the
result variable.
concurrentMap
08:44 We've improved the performance, but our code is now more verbose.
The first version, using map, was much cleaner. Let's see if we can get this
simplicity back by creating a concurrentMap function, which has the same
interface as map.
09:37 We could write concurrentMap for any collection, as long as it
can give us a count and we have a way to randomly access the elements by
index. On top of the implementation below, the normal map also supports
throwing functions, but we'll skip that functionality for now:
extension Array {
func concurrentMap<B>(_ transform: @escaping (Element) -> B) -> [B] {
var result = Array<B?>(repeating: nil, count: count)
let q = DispatchQueue(label: "sync queue")
DispatchQueue.concurrentPerform(iterations: count) { idx in
let element = self[idx]
q.sync {
result[idx] = transform(element)
}
}
return result.map { $0! }
}
}
11:32 Now we can call concurrentMap, and we no longer have to
force-unwrap the elements of result, because it's nicely hidden inside
concurrentMap:
time {
let result = urls.concurrentMap { Parser(url: $0)!.points.count }
print(result.reduce(0, +))
}
12:34 It works, but we're back at the old, slow speed! We made a little
mistake in concurrentMap, because we're calling the transform function inside
the synchronous dispatch to the serial queue. That means all the parsing is done
serially again. To fix this bug, we move the call to the transform function
outside the sync block:
extension Array {
func concurrentMap<B>(_ transform: @escaping (Element) -> B) -> [B] {
var result = Array<B?>(repeating: nil, count: count)
let q = DispatchQueue(label: "sync queue")
DispatchQueue.concurrentPerform(iterations: count) { idx in
let element = self[idx]
let transformed = transform(element)
q.sync {
result[idx] = transformed
}
}
return result.map { $0! }
}
}
13:30 The parsing happens concurrently again, and we're back at roughly
the same improved speed.
13:39 The code at the call site is much cleaner now: we're simply
calling concurrentMap instead of map. This isn't necessarily a solution for
everything; we don't need concurrency whenever we call map. In this case, it
works nicely, because we have a good amount of expensive operations, so it pays
off to perform them concurrently. But if we just want to map over an array of
integers and do simple calculations, the overhead of concurrency doesn't make
much sense.
Abstracting Out ThreadSafe
14:41 There's one more thing to improve. We're using a serial queue to
guard access to the result variable, but we can abstract this out. In fact, we
did this before in an episode about thread
safety.
15:26 We create a generic class, ThreadSafe, that will hold any value
and provide a thread-safe way to access that value:
final class ThreadSafe<A> {
private var _value: A
private let queue = DispatchQueue(label: "ThreadSafe")
init(_ value: A) {
self._value = value
}
var value: A {
return queue.sync { _value }
}
}
17:03 In addition to getting the value out, we also need a way to
mutate the value. The most obvious way would be to define a setter that, like
the getter, dispatches to the serial queue. But that's a bad idea, as a couple
of viewers pointed out to us last time; doing something like x.value += 1 can
cause issues, because reading the value and writing to it are operations
performed separately on the private queue of the thread-safe x:
final class ThreadSafe<A> {
func atomically(_ transform: (inout A) -> ()) {
queue.sync {
transform(&self._value)
}
}
}
19:06 We can now get rid of the queue in concurrentMap, and instead
wrap the result in a ThreadSafe instance:
extension Array {
func concurrentMap<B>(_ transform: @escaping (Element) -> B) -> [B] {
let result = ThreadSafe(Array<B?>(repeating: nil, count: count))
DispatchQueue.concurrentPerform(iterations: count) { idx in
let element = self[idx]
let transformed = transform(element)
result.atomically {
$0[idx] = transformed
}
}
return result.value.map { $0! }
}
}
20:10 This seems to work well and we've cleaned up concurrentMap.
It's a very small improvement, but abstracting out the thread safety means we
have one less thing to worry about. And we're able to use ThreadSafe in other
places too.
20:42 We could also consider a version of atomically that writes to
the value asynchronously. In our case, this wouldn't work because we want to
return from concurrentMap and assure the caller that the work is done. Using
sync is therefore the better option. However, the downside of sync is that
we have to be careful and think about deadlocks, but in this case, we're certain
the code is correct.
21:27 We've made it possible to apply a minimal change at the call site
— switching from map to concurrentMap — and achieve a drastic performance
improvement for parsing our GPX files. We're calling it a day at this point, but
in the future, we could also make concurrentMap work for Collection and add
support for error throwing.