diff --git a/lib/README.md b/lib/README.md index c5bedd9..71b9b13 100644 --- a/lib/README.md +++ b/lib/README.md @@ -273,6 +273,37 @@ docs: params: "promises: array>" returns: Promise + - name: each + desc: | + Iterates serially over the given an array of values, calling the predicate callback on each value before continuing. + + If the predicate returns a Promise, we wait for that Promise to resolve before moving on to the next item + in the array. + + If the Promise a predicate returns rejects, the Promise from `Promise.each` is also rejected with the same value. + + If the array of values contains a Promise, when we get to that point in the list, we wait for the Promise to resolve before calling the predicate with the value. + + If a Promise in the array of values is already Rejected when `Promise.each` is called, `Promise.each` rejects with that value immediately (the predicate callback will never be called even once). If a Promise in the list is already Cancelled when `Promise.each` is called, `Promise.each` rejects with `Promise.Error(Promise.Error.Kind.AlreadyCancelled`). If a Promise in the array of values is Started at first, but later rejects, `Promise.each` will reject with that value and iteration will not continue once iteration encounters that value. + + Returns a Promise containing an array of the returned/resolved values from the predicate for each item in the array of values. + + If this Promise returned from `Promise.each` rejects or is cancelled for any reason, the following are true: + - Iteration will not continue. + - Any Promises within the array of values will now be cancelled if they have no other consumers. + - The Promise returned from the currently active predicate will be cancelled if it hasn't resolved yet. + params: + - name: list + type: "array>" + - name: predicate + desc: The callback to call for each value in the list. + type: + kind: function + params: "value: T, index: number" + returns: U | Promise + returns: Promise> + static: true + - name: delay desc: | Returns a Promise that resolves after `seconds` seconds have passed. The Promise resolves with the actual amount of time that was waited. diff --git a/lib/init.lua b/lib/init.lua index 8037f0d..b1f50aa 100644 --- a/lib/init.lua +++ b/lib/init.lua @@ -564,6 +564,106 @@ function Promise.race(promises) end) end +--[[ + Iterates serially over the given an array of values, calling the predicate callback on each before continuing. + If the predicate returns a Promise, we wait for that Promise to resolve before continuing to the next item + in the array. If the Promise the predicate returns rejects, the Promise from Promise.each is also rejected with + the same value. + + Returns a Promise containing an array of the return values from the predicate for each item in the original list. +]] +function Promise.each(list, predicate) + assert(type(list) == "table", ERROR_NON_LIST:format("Promise.each")) + assert(type(predicate) == "function", ERROR_NON_FUNCTION:format("Promise.each")) + + return Promise._new(debug.traceback(nil, 2), function(resolve, reject, onCancel) + local results = {} + local promisesToCancel = {} + + local cancelled = false + + local function cancel() + for _, promiseToCancel in ipairs(promisesToCancel) do + promiseToCancel:cancel() + end + end + + onCancel(function() + cancelled = true + + cancel() + end) + + -- We need to preprocess the list of values and look for Promises. + -- If we find some, we must register our andThen calls now, so that those Promises have a consumer + -- from us registered. If we don't do this, those Promises might get cancelled by something else + -- before we get to them in the series because it's not possible to tell that we plan to use it + -- unless we indicate it here. + + local preprocessedList = {} + + for index, value in ipairs(list) do + if Promise.is(value) then + if value:getStatus() == Promise.Status.Cancelled then + cancel() + return reject(Error.new({ + error = "Promise is cancelled", + kind = Error.Kind.AlreadyCancelled, + context = ("The Promise that was part of the array at index %d passed into Promise.each was already cancelled when Promise.each began.\n\nThat Promise was created at:\n\n%s"):format( + index, + value._source + ) + })) + elseif value:getStatus() == Promise.Status.Rejected then + cancel() + return reject(select(2, value:await())) + end + + -- Chain a new Promise from this one so we only cancel ours + local ourPromise = value:andThen(function(...) + return ... + end) + + table.insert(promisesToCancel, ourPromise) + preprocessedList[index] = ourPromise + else + preprocessedList[index] = value + end + end + + for index, value in ipairs(preprocessedList) do + if Promise.is(value) then + local success + success, value = value:await() + + if not success then + cancel() + return reject(value) + end + end + + if cancelled then + return + end + + local predicatePromise = Promise.resolve(predicate(value, index)) + + table.insert(promisesToCancel, predicatePromise) + + local success, result = predicatePromise:await() + + if not success then + cancel() + return reject(result) + end + + results[index] = result + end + + resolve(results) + end) +end + --[[ Is the given object a Promise instance? ]] diff --git a/lib/init.spec.lua b/lib/init.spec.lua index 1b5622d..1da168c 100644 --- a/lib/init.spec.lua +++ b/lib/init.spec.lua @@ -1236,4 +1236,178 @@ return function() expect(value).to.equal("foo") end) end) + + describe("Promise.each", function() + it("should iterate", function() + local ok, result = Promise.each({ + "foo", "bar", "baz", "qux" + }, function(...) + return {...} + end):_unwrap() + + expect(ok).to.equal(true) + expect(result[1][1]).to.equal("foo") + expect(result[1][2]).to.equal(1) + expect(result[2][1]).to.equal("bar") + expect(result[2][2]).to.equal(2) + expect(result[3][1]).to.equal("baz") + expect(result[3][2]).to.equal(3) + expect(result[4][1]).to.equal("qux") + expect(result[4][2]).to.equal(4) + end) + + it("should iterate serially", function() + local resolves = {} + local callCounts = {} + + local promise = Promise.each({ + "foo", "bar", "baz" + }, function(value, index) + callCounts[index] = (callCounts[index] or 0) + 1 + + return Promise.new(function(resolve) + table.insert(resolves, function() + resolve(value:upper()) + end) + end) + end) + + expect(promise:getStatus()).to.equal(Promise.Status.Started) + expect(#resolves).to.equal(1) + expect(callCounts[1]).to.equal(1) + expect(callCounts[2]).to.never.be.ok() + + table.remove(resolves, 1)() + + expect(promise:getStatus()).to.equal(Promise.Status.Started) + expect(#resolves).to.equal(1) + expect(callCounts[1]).to.equal(1) + expect(callCounts[2]).to.equal(1) + expect(callCounts[3]).to.never.be.ok() + + table.remove(resolves, 1)() + + expect(promise:getStatus()).to.equal(Promise.Status.Started) + expect(callCounts[1]).to.equal(1) + expect(callCounts[2]).to.equal(1) + expect(callCounts[3]).to.equal(1) + + table.remove(resolves, 1)() + + expect(promise:getStatus()).to.equal(Promise.Status.Resolved) + expect(type(promise._values[1])).to.equal("table") + expect(type(promise._values[2])).to.equal("nil") + + local result = promise._values[1] + + expect(result[1]).to.equal("FOO") + expect(result[2]).to.equal("BAR") + expect(result[3]).to.equal("BAZ") + end) + + it("should reject with the value if the predicate promise rejects", function() + local promise = Promise.each({1, 2, 3}, function() + return Promise.reject("foobar") + end) + + expect(promise:getStatus()).to.equal(Promise.Status.Rejected) + expect(promise._values[1]).to.equal("foobar") + end) + + it("should allow Promises to be in the list and wait when it gets to them", function() + local innerResolve + local innerPromise = Promise.new(function(resolve) + innerResolve = resolve + end) + + local promise = Promise.each({ + innerPromise + }, function(value) + return value * 2 + end) + + expect(promise:getStatus()).to.equal(Promise.Status.Started) + + innerResolve(2) + + expect(promise:getStatus()).to.equal(Promise.Status.Resolved) + expect(promise._values[1][1]).to.equal(4) + end) + + it("should reject with the value if a Promise from the list rejects", function() + local called = false + local promise = Promise.each({1, 2, Promise.reject("foobar")}, function(value) + called = true + return "never" + end) + + expect(promise:getStatus()).to.equal(Promise.Status.Rejected) + expect(promise._values[1]).to.equal("foobar") + expect(called).to.equal(false) + end) + + it("should reject immediately if there's a cancelled Promise in the list initially", function() + local cancelled = Promise.new(function() end) + cancelled:cancel() + + local called = false + local promise = Promise.each({1, 2, cancelled}, function() + called = true + end) + + expect(promise:getStatus()).to.equal(Promise.Status.Rejected) + expect(called).to.equal(false) + expect(promise._values[1].kind).to.equal(Promise.Error.Kind.AlreadyCancelled) + end) + + it("should stop iteration if Promise.each is cancelled", function() + local callCounts = {} + + local promise = Promise.each({ + "foo", "bar", "baz" + }, function(value, index) + callCounts[index] = (callCounts[index] or 0) + 1 + + return Promise.new(function() + + end) + end) + + expect(promise:getStatus()).to.equal(Promise.Status.Started) + expect(callCounts[1]).to.equal(1) + expect(callCounts[2]).to.never.be.ok() + + promise:cancel() + + expect(promise:getStatus()).to.equal(Promise.Status.Cancelled) + expect(callCounts[1]).to.equal(1) + expect(callCounts[2]).to.never.be.ok() + end) + + it("should cancel the Promise returned from the predicate if Promise.each is cancelled", function() + local innerPromise + + local promise = Promise.each({ + "foo", "bar", "baz" + }, function(value, index) + innerPromise = Promise.new(function() + end) + return innerPromise + end) + + promise:cancel() + + expect(innerPromise:getStatus()).to.equal(Promise.Status.Cancelled) + end) + + it("should cancel Promises in the list if Promise.each is cancelled", function() + local innerPromise = Promise.new(function() end) + + local promise = Promise.each({innerPromise}, function() end) + + promise:cancel() + + expect(innerPromise:getStatus()).to.equal(Promise.Status.Cancelled) + end) + end) end \ No newline at end of file