Add Promise.each

closes #21
This commit is contained in:
Eryn Lynn 2020-05-13 19:48:45 -04:00
parent 078abeae83
commit 0c0d7f0464
3 changed files with 305 additions and 0 deletions

View file

@ -273,6 +273,37 @@ docs:
params: "promises: array<Promise<T>>"
returns: Promise<T>
- name: each
desc: |
Iterates serially over the given an array of values, calling the predicate callback on each value before continuing.
If the predicate returns a Promise, we wait for that Promise to resolve before moving on to the next item
in the array.
If the Promise a predicate returns rejects, the Promise from `Promise.each` is also rejected with the same value.
If the array of values contains a Promise, when we get to that point in the list, we wait for the Promise to resolve before calling the predicate with the value.
If a Promise in the array of values is already Rejected when `Promise.each` is called, `Promise.each` rejects with that value immediately (the predicate callback will never be called even once). If a Promise in the list is already Cancelled when `Promise.each` is called, `Promise.each` rejects with `Promise.Error(Promise.Error.Kind.AlreadyCancelled`). If a Promise in the array of values is Started at first, but later rejects, `Promise.each` will reject with that value and iteration will not continue once iteration encounters that value.
Returns a Promise containing an array of the returned/resolved values from the predicate for each item in the array of values.
If this Promise returned from `Promise.each` rejects or is cancelled for any reason, the following are true:
- Iteration will not continue.
- Any Promises within the array of values will now be cancelled if they have no other consumers.
- The Promise returned from the currently active predicate will be cancelled if it hasn't resolved yet.
params:
- name: list
type: "array<T | Promise<T>>"
- name: predicate
desc: The callback to call for each value in the list.
type:
kind: function
params: "value: T, index: number"
returns: U | Promise<U>
returns: Promise<array<U>>
static: true
- name: delay
desc: |
Returns a Promise that resolves after `seconds` seconds have passed. The Promise resolves with the actual amount of time that was waited.

View file

@ -564,6 +564,106 @@ function Promise.race(promises)
end)
end
--[[
Iterates serially over the given an array of values, calling the predicate callback on each before continuing.
If the predicate returns a Promise, we wait for that Promise to resolve before continuing to the next item
in the array. If the Promise the predicate returns rejects, the Promise from Promise.each is also rejected with
the same value.
Returns a Promise containing an array of the return values from the predicate for each item in the original list.
]]
function Promise.each(list, predicate)
assert(type(list) == "table", ERROR_NON_LIST:format("Promise.each"))
assert(type(predicate) == "function", ERROR_NON_FUNCTION:format("Promise.each"))
return Promise._new(debug.traceback(nil, 2), function(resolve, reject, onCancel)
local results = {}
local promisesToCancel = {}
local cancelled = false
local function cancel()
for _, promiseToCancel in ipairs(promisesToCancel) do
promiseToCancel:cancel()
end
end
onCancel(function()
cancelled = true
cancel()
end)
-- We need to preprocess the list of values and look for Promises.
-- If we find some, we must register our andThen calls now, so that those Promises have a consumer
-- from us registered. If we don't do this, those Promises might get cancelled by something else
-- before we get to them in the series because it's not possible to tell that we plan to use it
-- unless we indicate it here.
local preprocessedList = {}
for index, value in ipairs(list) do
if Promise.is(value) then
if value:getStatus() == Promise.Status.Cancelled then
cancel()
return reject(Error.new({
error = "Promise is cancelled",
kind = Error.Kind.AlreadyCancelled,
context = ("The Promise that was part of the array at index %d passed into Promise.each was already cancelled when Promise.each began.\n\nThat Promise was created at:\n\n%s"):format(
index,
value._source
)
}))
elseif value:getStatus() == Promise.Status.Rejected then
cancel()
return reject(select(2, value:await()))
end
-- Chain a new Promise from this one so we only cancel ours
local ourPromise = value:andThen(function(...)
return ...
end)
table.insert(promisesToCancel, ourPromise)
preprocessedList[index] = ourPromise
else
preprocessedList[index] = value
end
end
for index, value in ipairs(preprocessedList) do
if Promise.is(value) then
local success
success, value = value:await()
if not success then
cancel()
return reject(value)
end
end
if cancelled then
return
end
local predicatePromise = Promise.resolve(predicate(value, index))
table.insert(promisesToCancel, predicatePromise)
local success, result = predicatePromise:await()
if not success then
cancel()
return reject(result)
end
results[index] = result
end
resolve(results)
end)
end
--[[
Is the given object a Promise instance?
]]

View file

@ -1236,4 +1236,178 @@ return function()
expect(value).to.equal("foo")
end)
end)
describe("Promise.each", function()
it("should iterate", function()
local ok, result = Promise.each({
"foo", "bar", "baz", "qux"
}, function(...)
return {...}
end):_unwrap()
expect(ok).to.equal(true)
expect(result[1][1]).to.equal("foo")
expect(result[1][2]).to.equal(1)
expect(result[2][1]).to.equal("bar")
expect(result[2][2]).to.equal(2)
expect(result[3][1]).to.equal("baz")
expect(result[3][2]).to.equal(3)
expect(result[4][1]).to.equal("qux")
expect(result[4][2]).to.equal(4)
end)
it("should iterate serially", function()
local resolves = {}
local callCounts = {}
local promise = Promise.each({
"foo", "bar", "baz"
}, function(value, index)
callCounts[index] = (callCounts[index] or 0) + 1
return Promise.new(function(resolve)
table.insert(resolves, function()
resolve(value:upper())
end)
end)
end)
expect(promise:getStatus()).to.equal(Promise.Status.Started)
expect(#resolves).to.equal(1)
expect(callCounts[1]).to.equal(1)
expect(callCounts[2]).to.never.be.ok()
table.remove(resolves, 1)()
expect(promise:getStatus()).to.equal(Promise.Status.Started)
expect(#resolves).to.equal(1)
expect(callCounts[1]).to.equal(1)
expect(callCounts[2]).to.equal(1)
expect(callCounts[3]).to.never.be.ok()
table.remove(resolves, 1)()
expect(promise:getStatus()).to.equal(Promise.Status.Started)
expect(callCounts[1]).to.equal(1)
expect(callCounts[2]).to.equal(1)
expect(callCounts[3]).to.equal(1)
table.remove(resolves, 1)()
expect(promise:getStatus()).to.equal(Promise.Status.Resolved)
expect(type(promise._values[1])).to.equal("table")
expect(type(promise._values[2])).to.equal("nil")
local result = promise._values[1]
expect(result[1]).to.equal("FOO")
expect(result[2]).to.equal("BAR")
expect(result[3]).to.equal("BAZ")
end)
it("should reject with the value if the predicate promise rejects", function()
local promise = Promise.each({1, 2, 3}, function()
return Promise.reject("foobar")
end)
expect(promise:getStatus()).to.equal(Promise.Status.Rejected)
expect(promise._values[1]).to.equal("foobar")
end)
it("should allow Promises to be in the list and wait when it gets to them", function()
local innerResolve
local innerPromise = Promise.new(function(resolve)
innerResolve = resolve
end)
local promise = Promise.each({
innerPromise
}, function(value)
return value * 2
end)
expect(promise:getStatus()).to.equal(Promise.Status.Started)
innerResolve(2)
expect(promise:getStatus()).to.equal(Promise.Status.Resolved)
expect(promise._values[1][1]).to.equal(4)
end)
it("should reject with the value if a Promise from the list rejects", function()
local called = false
local promise = Promise.each({1, 2, Promise.reject("foobar")}, function(value)
called = true
return "never"
end)
expect(promise:getStatus()).to.equal(Promise.Status.Rejected)
expect(promise._values[1]).to.equal("foobar")
expect(called).to.equal(false)
end)
it("should reject immediately if there's a cancelled Promise in the list initially", function()
local cancelled = Promise.new(function() end)
cancelled:cancel()
local called = false
local promise = Promise.each({1, 2, cancelled}, function()
called = true
end)
expect(promise:getStatus()).to.equal(Promise.Status.Rejected)
expect(called).to.equal(false)
expect(promise._values[1].kind).to.equal(Promise.Error.Kind.AlreadyCancelled)
end)
it("should stop iteration if Promise.each is cancelled", function()
local callCounts = {}
local promise = Promise.each({
"foo", "bar", "baz"
}, function(value, index)
callCounts[index] = (callCounts[index] or 0) + 1
return Promise.new(function()
end)
end)
expect(promise:getStatus()).to.equal(Promise.Status.Started)
expect(callCounts[1]).to.equal(1)
expect(callCounts[2]).to.never.be.ok()
promise:cancel()
expect(promise:getStatus()).to.equal(Promise.Status.Cancelled)
expect(callCounts[1]).to.equal(1)
expect(callCounts[2]).to.never.be.ok()
end)
it("should cancel the Promise returned from the predicate if Promise.each is cancelled", function()
local innerPromise
local promise = Promise.each({
"foo", "bar", "baz"
}, function(value, index)
innerPromise = Promise.new(function()
end)
return innerPromise
end)
promise:cancel()
expect(innerPromise:getStatus()).to.equal(Promise.Status.Cancelled)
end)
it("should cancel Promises in the list if Promise.each is cancelled", function()
local innerPromise = Promise.new(function() end)
local promise = Promise.each({innerPromise}, function() end)
promise:cancel()
expect(innerPromise:getStatus()).to.equal(Promise.Status.Cancelled)
end)
end)
end