From 85f6aa8b770a20e58c819ac3dd86d1f685462fe0 Mon Sep 17 00:00:00 2001 From: khtsly Date: Mon, 23 Feb 2026 11:49:39 +0700 Subject: [PATCH] rewrite(phase5): finalize identifier replication implementation and fixes the race condition --- docs/api/1.1/client.md | 19 ++++++ src/Client/init.luau | 2 + src/Replication/init.luau | 129 ++++++++++++++++++++++++++++++++++---- 3 files changed, 137 insertions(+), 13 deletions(-) diff --git a/docs/api/1.1/client.md b/docs/api/1.1/client.md index 0acaad1..8f42de2 100644 --- a/docs/api/1.1/client.md +++ b/docs/api/1.1/client.md @@ -8,6 +8,25 @@ For Client-sided operations. local Client = Warp.Client() ``` +## `.awaitReady` + +Yields the current thread until the client has successfully initialized and synchronized with the server's replication data (identifier). Its optionally, but highly recommended to call this before firing or connecting to any events to ensure the network is fully ready. + +::: code-group +```luau [Variable] +() -> () +``` + +```luau [Example] +local Client = Warp.Client() + +-- wait for the client to be fully initialized +Client.awaitReady() + +print("Client is now ready to send and receive events! :D") +``` +::: + ## `.Connect` Connect to an event to receive incoming data from the server. diff --git a/src/Client/init.luau b/src/Client/init.luau index dd75beb..6b17c3b 100644 --- a/src/Client/init.luau +++ b/src/Client/init.luau @@ -29,6 +29,8 @@ local eventSchemas: { [number]: Buffer.SchemaType } = {} local pendingInvokes: { [string]: thread } = {} local invokeId = 0 +Client.awaitReady = Replication.wait_for_ready + Client.useSchema = function(remoteName: string, schema: Buffer.SchemaType) local id = Replication.get_id(remoteName) if not id then diff --git a/src/Replication/init.luau b/src/Replication/init.luau index e2a524d..cc6c9ec 100644 --- a/src/Replication/init.luau +++ b/src/Replication/init.luau @@ -9,10 +9,12 @@ local Buffer = require("./Util/Buffer") local Identifier = require("./Util/Identifier") local identifiers_schema = Buffer.Schema.string -local writer: Buffer.Writer = Buffer.createWriter() local warp_identifier_registry = shared.__warp_identifier_registry if RunService:IsClient() or RunService:IsRunMode() then + local pending_id_yields, pending_name_yields, ready_yields = {}, {}, {} + local is_ready = false + if RunService:IsClient() then _repl.OnClientEvent:Connect(function(b: buffer) if type(b) ~= "buffer" then @@ -24,20 +26,101 @@ if RunService:IsClient() or RunService:IsRunMode() then local id, remote = content[1], content[2] warp_identifier_registry.cache[remote] = id warp_identifier_registry.name[id] = remote + + if pending_id_yields[remote] then + for _, thread in pending_id_yields[remote] do + task.spawn(thread, id) + end + pending_id_yields[remote] = nil + end + if pending_name_yields[id] then + for _, thread in pending_name_yields[id] do + task.spawn(thread, remote) + end + pending_name_yields[id] = nil + end + end + if not is_ready then + is_ready = true + for _, thread in ready_yields do + task.spawn(thread :: thread) + end + table.clear(ready_yields) end end) _repl:FireServer() end - Replication.get_id = function(name: string): number - return warp_identifier_registry.cache[name] + --@yield + -- wait for the identifiers to be replicated from the server + Replication.wait_for_ready = function() + if is_ready then return end + + local thread = coroutine.running() + table.insert(ready_yields, thread) + coroutine.yield() end - Replication.get_name = function(name: string): number - return warp_identifier_registry.name[name] + + --@name string + --@timeout number (default: 0) + Replication.get_id = function(name: string, timeout: number?): number + local cached = warp_identifier_registry.cache[name] + if cached or type(timeout) ~= "number" then return cached end + + local thread = coroutine.running() + + if not pending_id_yields[name] then pending_id_yields[name] = {} end + table.insert(pending_id_yields[name], thread) + + task.delay(timeout, function() + if pending_id_yields[name] then + local idx = table.find(pending_id_yields[name], thread) + if idx then + table.remove(pending_id_yields[name], idx) + task.spawn(thread, nil) + end + end + end) + + local obj: number = coroutine.yield() + if not obj then + warn(`[Replication] timeout: could not find identifier '{name}' after {timeout}s.`) + end + + return obj + end + + --@name string + --@timeout number (default: 0) + Replication.get_name = function(id: number, timeout: number?): string + local cached = warp_identifier_registry.name[id] + if cached or type(timeout) ~= "number" then return cached end + + local thread = coroutine.running() + if not pending_name_yields[id] then pending_name_yields[id] = {} end + table.insert(pending_name_yields[id], thread) + + task.delay(timeout, function() + if pending_name_yields[id] then + local idx = table.find(pending_name_yields[id], thread) + if idx then + table.remove(pending_name_yields[id], idx) + task.spawn(thread, nil) + end + end + end) + + local obj: string = coroutine.yield() + if not id then + warn(`[Replication] timeout: could not find identifier '{id}' after {timeout}s.`) + end + return obj end else - local replication_ready: { Player } = {} + local replication_ready: { Player }, pending_replications = {}, {} + local writer: Buffer.Writer = Buffer.createWriter() local replication_id: number = Identifier.get_id("id_replication") or 1 + local is_scheduled = false if not Identifier.has_name("id_replication") or not replication_id then replication_id = Identifier.get_id("id_replication") or 1 @@ -45,15 +128,35 @@ else local function replicateToAll(content: any, id: number?) if #replication_ready == 0 then return end - local to_repl: any = type(content) == "string" and { [content] = id } or content - Buffer.writeRepl(writer, to_repl, 1, identifiers_schema) - do - local buf = Buffer.build(writer) - Buffer.reset(writer) - for _, player: Player in replication_ready do - _repl:FireClient(player, buf) + + if type(content) == "string" and id then + pending_replications[content] = id + else + for k, v in content :: any do + pending_replications[k] = v end end + + if not is_scheduled then + is_scheduled = true + task.defer(function() + is_scheduled = false + + local count = 0 + for _ in pending_replications do count += 1 end + if count == 0 then return end + + Buffer.writeRepl(writer, pending_replications, count, identifiers_schema) + local buf = Buffer.build(writer) + Buffer.reset(writer) + + for _, player: Player in replication_ready do + _repl:FireClient(player, buf) + end + + table.clear(pending_replications) + end) + end end local function replicateTo(player: Player)