rewrite(phase5): finalize identifier replication implementation and fixes the race condition

This commit is contained in:
khtsly 2026-02-23 11:49:39 +07:00
parent 1fdf90e00c
commit 85f6aa8b77
3 changed files with 137 additions and 13 deletions

View file

@ -8,6 +8,25 @@ For Client-sided operations.
local Client = Warp.Client()
```
## `.awaitReady` <Badge type="warning" text="yield" />
Yields the current thread until the client has successfully initialized and synchronized with the server's replication data (identifier). Its optionally, but highly recommended to call this before firing or connecting to any events to ensure the network is fully ready.
::: code-group
```luau [Variable]
() -> ()
```
```luau [Example]
local Client = Warp.Client()
-- wait for the client to be fully initialized
Client.awaitReady()
print("Client is now ready to send and receive events! :D")
```
:::
## `.Connect`
Connect to an event to receive incoming data from the server.

View file

@ -29,6 +29,8 @@ local eventSchemas: { [number]: Buffer.SchemaType } = {}
local pendingInvokes: { [string]: thread } = {}
local invokeId = 0
Client.awaitReady = Replication.wait_for_ready
Client.useSchema = function(remoteName: string, schema: Buffer.SchemaType)
local id = Replication.get_id(remoteName)
if not id then

View file

@ -9,10 +9,12 @@ local Buffer = require("./Util/Buffer")
local Identifier = require("./Util/Identifier")
local identifiers_schema = Buffer.Schema.string
local writer: Buffer.Writer = Buffer.createWriter()
local warp_identifier_registry = shared.__warp_identifier_registry
if RunService:IsClient() or RunService:IsRunMode() then
local pending_id_yields, pending_name_yields, ready_yields = {}, {}, {}
local is_ready = false
if RunService:IsClient() then
_repl.OnClientEvent:Connect(function(b: buffer)
if type(b) ~= "buffer" then
@ -24,20 +26,101 @@ if RunService:IsClient() or RunService:IsRunMode() then
local id, remote = content[1], content[2]
warp_identifier_registry.cache[remote] = id
warp_identifier_registry.name[id] = remote
if pending_id_yields[remote] then
for _, thread in pending_id_yields[remote] do
task.spawn(thread, id)
end
pending_id_yields[remote] = nil
end
if pending_name_yields[id] then
for _, thread in pending_name_yields[id] do
task.spawn(thread, remote)
end
pending_name_yields[id] = nil
end
end
if not is_ready then
is_ready = true
for _, thread in ready_yields do
task.spawn(thread :: thread)
end
table.clear(ready_yields)
end
end)
_repl:FireServer()
end
Replication.get_id = function(name: string): number
return warp_identifier_registry.cache[name]
--@yield
-- wait for the identifiers to be replicated from the server
Replication.wait_for_ready = function()
if is_ready then return end
local thread = coroutine.running()
table.insert(ready_yields, thread)
coroutine.yield()
end
Replication.get_name = function(name: string): number
return warp_identifier_registry.name[name]
--@name string
--@timeout number (default: 0)
Replication.get_id = function(name: string, timeout: number?): number
local cached = warp_identifier_registry.cache[name]
if cached or type(timeout) ~= "number" then return cached end
local thread = coroutine.running()
if not pending_id_yields[name] then pending_id_yields[name] = {} end
table.insert(pending_id_yields[name], thread)
task.delay(timeout, function()
if pending_id_yields[name] then
local idx = table.find(pending_id_yields[name], thread)
if idx then
table.remove(pending_id_yields[name], idx)
task.spawn(thread, nil)
end
end
end)
local obj: number = coroutine.yield()
if not obj then
warn(`[Replication] timeout: could not find identifier '{name}' after {timeout}s.`)
end
return obj
end
--@name string
--@timeout number (default: 0)
Replication.get_name = function(id: number, timeout: number?): string
local cached = warp_identifier_registry.name[id]
if cached or type(timeout) ~= "number" then return cached end
local thread = coroutine.running()
if not pending_name_yields[id] then pending_name_yields[id] = {} end
table.insert(pending_name_yields[id], thread)
task.delay(timeout, function()
if pending_name_yields[id] then
local idx = table.find(pending_name_yields[id], thread)
if idx then
table.remove(pending_name_yields[id], idx)
task.spawn(thread, nil)
end
end
end)
local obj: string = coroutine.yield()
if not id then
warn(`[Replication] timeout: could not find identifier '{id}' after {timeout}s.`)
end
return obj
end
else
local replication_ready: { Player } = {}
local replication_ready: { Player }, pending_replications = {}, {}
local writer: Buffer.Writer = Buffer.createWriter()
local replication_id: number = Identifier.get_id("id_replication") or 1
local is_scheduled = false
if not Identifier.has_name("id_replication") or not replication_id then
replication_id = Identifier.get_id("id_replication") or 1
@ -45,15 +128,35 @@ else
local function replicateToAll(content: any, id: number?)
if #replication_ready == 0 then return end
local to_repl: any = type(content) == "string" and { [content] = id } or content
Buffer.writeRepl(writer, to_repl, 1, identifiers_schema)
do
local buf = Buffer.build(writer)
Buffer.reset(writer)
for _, player: Player in replication_ready do
_repl:FireClient(player, buf)
if type(content) == "string" and id then
pending_replications[content] = id
else
for k, v in content :: any do
pending_replications[k] = v
end
end
if not is_scheduled then
is_scheduled = true
task.defer(function()
is_scheduled = false
local count = 0
for _ in pending_replications do count += 1 end
if count == 0 then return end
Buffer.writeRepl(writer, pending_replications, count, identifiers_schema)
local buf = Buffer.build(writer)
Buffer.reset(writer)
for _, player: Player in replication_ready do
_repl:FireClient(player, buf)
end
table.clear(pending_replications)
end)
end
end
local function replicateTo(player: Player)