Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ out
hs_err_pid*.log
.bsp
metals.sbt
**/output
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ trait Allocation:
extension (buffer: GBinding[?])
def read(bb: ByteBuffer, offset: Int = 0): Unit

def write(bb: ByteBuffer, offset: Int = 0): Unit
def write(bb: ByteBuffer, offset: Int = 0)(using name: sourcecode.FileName, line: sourcecode.Line): Unit

extension [Params, EL <: Layout: LayoutBinding, RL <: Layout: LayoutBinding](execution: GExecution[Params, EL, RL])
def execute(params: Params, layout: EL): RL
def execute(params: Params, layout: EL)(using name: sourcecode.FileName, line: sourcecode.Line): RL

extension (buffers: GBuffer.type)
def apply[T <: Value: {Tag, FromExpr}](length: Int): GBuffer[T]

def apply[T <: Value: {Tag, FromExpr}](buff: ByteBuffer): GBuffer[T]
def apply[T <: Value: {Tag, FromExpr}](buff: ByteBuffer)(using name: sourcecode.FileName, line: sourcecode.Line): GBuffer[T]

extension (buffers: GUniform.type)
def apply[T <: GStruct[T]: {Tag, FromExpr, GStructSchema}](buff: ByteBuffer): GUniform[T]
def apply[T <: GStruct[T]: {Tag, FromExpr, GStructSchema}](buff: ByteBuffer)(using name: sourcecode.FileName, line: sourcecode.Line): GUniform[T]

def apply[T <: GStruct[T]: {Tag, FromExpr, GStructSchema}](): GUniform[T]
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import org.lwjgl.system.MemoryUtil
import java.nio.file.Paths
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

class RuntimeEnduranceTest extends munit.FunSuite:
override def munitTimeout: Duration = Duration("5 minutes")

test("Endurance test for GExecution with multiple programs"):
runEnduranceTest(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ class ExecutionHandler(runtime: VkCyfraRuntime, threadContext: VulkanThreadConte
private val dsManager: DescriptorSetManager = threadContext.descriptorSetManager
private val commandPool: CommandPool = threadContext.commandPool

def handle[Params, EL <: Layout: LayoutBinding, RL <: Layout: LayoutBinding](execution: GExecution[Params, EL, RL], params: Params, layout: EL)(
using VkAllocation,
): RL =
def handle[Params, EL <: Layout: LayoutBinding, RL <: Layout: LayoutBinding](
execution: GExecution[Params, EL, RL],
params: Params,
layout: EL,
message: String,
)(using VkAllocation): RL =
val (result, shaderCalls) = interpret(execution, params, layout)

val descriptorSets = shaderCalls.map:
Expand Down Expand Up @@ -74,7 +77,7 @@ class ExecutionHandler(runtime: VkCyfraRuntime, threadContext: VulkanThreadConte

val externalBindings = getAllBindings(executeSteps).map(VkAllocation.getUnderlying)
val deps = externalBindings.flatMap(_.execution.fold(Seq(_), _.toSeq))
val pe = new PendingExecution(commandBuffer, deps, cleanup)
val pe = new PendingExecution(commandBuffer, deps, cleanup, message)
summon[VkAllocation].addExecution(pe)
externalBindings.foreach(_.execution = Left(pe)) // TODO we assume all accesses are read-write
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,36 @@ import io.computenode.cyfra.vulkan.command.{CommandPool, Fence, Semaphore}
import io.computenode.cyfra.vulkan.core.{Device, Queue}
import io.computenode.cyfra.vulkan.util.Util.{check, pushStack}
import io.computenode.cyfra.vulkan.util.VulkanObject
import org.lwjgl.vulkan.VK10.VK_TRUE
import org.lwjgl.vulkan.VK10.{VK_TRUE, vkQueueSubmit}
import org.lwjgl.vulkan.VK13.{VK_PIPELINE_STAGE_2_COPY_BIT, vkQueueSubmit2}
import org.lwjgl.vulkan.{VK13, VkCommandBuffer, VkCommandBufferSubmitInfo, VkSemaphoreSubmitInfo, VkSubmitInfo2}

import scala.collection.mutable
import org.lwjgl.vulkan.{
VK13,
VkCommandBuffer,
VkCommandBufferSubmitInfo,
VkSemaphoreSubmitInfo,
VkSubmitInfo,
VkSubmitInfo2,
VkTimelineSemaphoreSubmitInfo,
}

import scala.util.boundary

/** A command buffer that is pending execution, along with its dependencies and cleanup actions.
*
* You can call `close()` only when `isFinished || isPending` is true
*
* You can call `destroy()` only when all dependants are `isClosed`
*/
class PendingExecution(protected val handle: VkCommandBuffer, val dependencies: Seq[PendingExecution], cleanup: () => Unit)(using Device):
private val semaphore: Semaphore = Semaphore()
private var fence: Option[Fence] = None

def isPending: Boolean = fence.isEmpty
def isRunning: Boolean = fence.exists(f => f.isAlive && !f.isSignaled)
def isFinished: Boolean = fence.exists(f => !f.isAlive || f.isSignaled)
class PendingExecution(protected val handle: VkCommandBuffer, val dependencies: Seq[PendingExecution], cleanup: () => Unit, val message: String)(using
Device,
):
private var gathered = false
def isPending: Boolean = !gathered

def block(): Unit = fence.foreach(_.block())
private val semaphore: Semaphore = Semaphore()
def isRunning: Boolean = !isPending && semaphore.isAlive && semaphore.getValue == 0
def isFinished: Boolean = !semaphore.isAlive || semaphore.getValue > 0
def block(): Unit = semaphore.waitValue(1)

private var closed = false
def isClosed: Boolean = closed
Expand All @@ -39,76 +48,54 @@ class PendingExecution(protected val handle: VkCommandBuffer, val dependencies:
if destroyed then return
close()
semaphore.destroy()
fence.foreach(x => if x.isAlive then x.destroy())
destroyed = true

override def toString: String =
val state = if isPending then "Pending" else if isRunning then "Running" else if isFinished then "Finished" else "Unknown"
s"PendingExecution($message, $handle, $semaphore, state=$state dependencies=${dependencies.size})"

/** Gathers all command buffers and their semaphores for submission to the queue, in the correct order.
*
* When you call this method, you are expected to submit the command buffers to the queue, and signal the provided fence when done.
* @param f
* The fence to signal when the command buffers are done executing.
* When you call this method, you are expected to submit the command buffers to the queue, and signal the provided semaphore when done.
*
* @return
* A sequence of tuples, each containing a command buffer, semaphore to signal, and a set of semaphores to wait on.
*/
private def gatherForSubmission(f: Fence): Seq[((VkCommandBuffer, Semaphore), Set[Semaphore])] =
private def gatherForSubmission(): Seq[((VkCommandBuffer, Semaphore), Set[Semaphore])] =
if !isPending then return Seq.empty
val mySubmission = ((handle, semaphore), dependencies.map(_.semaphore).toSet)
fence = Some(f)
dependencies.flatMap(_.gatherForSubmission(f)).appended(mySubmission)
gathered = true
val mySubmission = ((handle, semaphore), Set.empty[Semaphore])
dependencies.flatMap(_.gatherForSubmission()).appended(mySubmission)

object PendingExecution:
def executeAll(executions: Seq[PendingExecution], queue: Queue)(using Device): Fence = pushStack: stack =>
def executeAll(executions: Seq[PendingExecution], allocation: VkAllocation)(using Device): Unit = pushStack: stack =>
assert(executions.forall(_.isPending), "All executions must be pending")
assert(executions.nonEmpty, "At least one execution must be provided")

val fence = Fence()

val exec: Seq[(Set[Semaphore], Set[(VkCommandBuffer, Semaphore)])] =
val gathered = executions.flatMap(_.gatherForSubmission(fence))
val ordering = gathered.zipWithIndex.map(x => (x._1._1._1, x._2)).toMap
gathered.toSet.groupMap(_._2)(_._1).toSeq.sortBy(x => x._2.map(_._1).map(ordering).min)

val submitInfos = VkSubmitInfo2.calloc(exec.size, stack)
exec.foreach: (semaphores, executions) =>
val pCommandBuffersSI = VkCommandBufferSubmitInfo.calloc(executions.size, stack)
val signalSemaphoreSI = VkSemaphoreSubmitInfo.calloc(executions.size, stack)
executions.foreach: (cb, s) =>
pCommandBuffersSI
.get()
.sType$Default()
.commandBuffer(cb)
.deviceMask(0)
signalSemaphoreSI
.get()
.sType$Default()
.semaphore(s.get)
.stageMask(VK13.VK_PIPELINE_STAGE_2_COPY_BIT | VK13.VK_PIPELINE_STAGE_2_COMPUTE_SHADER_BIT)

pCommandBuffersSI.flip()
signalSemaphoreSI.flip()

val waitSemaphoreSI = VkSemaphoreSubmitInfo.calloc(semaphores.size, stack)
semaphores.foreach: s =>
waitSemaphoreSI
.get()
.sType$Default()
.semaphore(s.get)
.stageMask(VK13.VK_PIPELINE_STAGE_2_COPY_BIT | VK13.VK_PIPELINE_STAGE_2_COMPUTE_SHADER_BIT)

waitSemaphoreSI.flip()
val gathered = executions.flatMap(_.gatherForSubmission()).map(x => (x._1._1, x._1._2, x._2))

val submitInfos = VkSubmitInfo.calloc(gathered.size, stack)
gathered.foreach: (commandBuffer, semaphore, dependencies) =>
val deps = dependencies.toList
val (semaphores, waitValue, signalValue) = ((semaphore.get, 0L, 1L) +: deps.map(x => (x.get, 1L, 0L))).unzip3

val timelineSI = VkTimelineSemaphoreSubmitInfo
.calloc(stack)
.sType$Default()
.pWaitSemaphoreValues(stack.longs(waitValue*))
.pSignalSemaphoreValues(stack.longs(signalValue*))

submitInfos
.get()
.sType$Default()
.flags(0)
.pCommandBufferInfos(pCommandBuffersSI)
.pSignalSemaphoreInfos(signalSemaphoreSI)
.pWaitSemaphoreInfos(waitSemaphoreSI)
.pNext(timelineSI)
.pCommandBuffers(stack.pointers(commandBuffer, allocation.synchroniseCommand))
.pSignalSemaphores(stack.longs(semaphores*))
.pWaitSemaphores(stack.longs(semaphores*))

submitInfos.flip()

check(vkQueueSubmit2(queue.get, submitInfos, fence.get), "Failed to submit command buffer to queue")
fence
check(vkQueueSubmit(allocation.commandPool.queue.get, submitInfos, 0), "Failed to submit command buffer to queue")

def cleanupAll(executions: Seq[PendingExecution]): Unit =
def cleanupRec(ex: PendingExecution): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,59 @@ import io.computenode.cyfra.runtime.VkAllocation.getUnderlying
import io.computenode.cyfra.spirv.SpirvTypes.typeStride
import io.computenode.cyfra.vulkan.command.CommandPool
import io.computenode.cyfra.vulkan.memory.{Allocator, Buffer}
import io.computenode.cyfra.vulkan.util.Util.pushStack
import io.computenode.cyfra.vulkan.util.Util.{check, pushStack}
import io.computenode.cyfra.dsl.Value.Int32
import io.computenode.cyfra.vulkan.core.Device
import izumi.reflect.Tag
import org.lwjgl.BufferUtils
import org.lwjgl.system.MemoryUtil
import org.lwjgl.vulkan.VK10
import org.lwjgl.vulkan.VK13.VK_PIPELINE_STAGE_2_COPY_BIT
import org.lwjgl.vulkan.VK10.{VK_BUFFER_USAGE_TRANSFER_DST_BIT, VK_BUFFER_USAGE_TRANSFER_SRC_BIT}
import org.lwjgl.vulkan.{VK10, VkCommandBuffer, VkCommandBufferBeginInfo, VkDependencyInfo, VkMemoryBarrier2}
import org.lwjgl.vulkan.VK13.*
import org.lwjgl.vulkan.VK10.*

import java.nio.ByteBuffer
import scala.collection.mutable
import scala.util.Try
import scala.util.chaining.*

class VkAllocation(commandPool: CommandPool, executionHandler: ExecutionHandler)(using Allocator, Device) extends Allocation:
class VkAllocation(val commandPool: CommandPool, executionHandler: ExecutionHandler)(using Allocator, Device) extends Allocation:
given VkAllocation = this

override def submitLayout[L <: Layout: LayoutBinding](layout: L): Unit =
val executions = summon[LayoutBinding[L]]
.toBindings(layout)
.map(getUnderlying)
.flatMap(x => Try(getUnderlying(x)).toOption)
.flatMap(_.execution.fold(Seq(_), _.toSeq))
.filter(_.isPending)

PendingExecution.executeAll(executions, commandPool.queue)
if executions.nonEmpty then PendingExecution.executeAll(executions, this)

extension (buffer: GBinding[?])
def read(bb: ByteBuffer, offset: Int = 0): Unit =
val size = bb.remaining()
buffer match
case VkBinding(buffer: Buffer.HostBuffer) => buffer.copyTo(bb, offset)
case binding: VkBinding[?] =>
binding.materialise(commandPool.queue)
binding.materialise(this)
val stagingBuffer = getStagingBuffer(size)
Buffer.copyBuffer(binding.buffer, stagingBuffer, offset, 0, size, commandPool)
stagingBuffer.copyTo(bb, 0)
stagingBuffer.destroy()
case _ => throw new IllegalArgumentException(s"Tried to read from non-VkBinding $buffer")

def write(bb: ByteBuffer, offset: Int = 0): Unit =
def write(bb: ByteBuffer, offset: Int = 0)(using name: sourcecode.FileName, line: sourcecode.Line): Unit =
val size = bb.remaining()
buffer match
case VkBinding(buffer: Buffer.HostBuffer) => buffer.copyFrom(bb, offset)
case binding: VkBinding[?] =>
binding.materialise(commandPool.queue)
binding.materialise(this)
val stagingBuffer = getStagingBuffer(size)
stagingBuffer.copyFrom(bb, 0)
val cb = Buffer.copyBufferCommandBuffer(stagingBuffer, binding.buffer, 0, offset, size, commandPool)
val cleanup = () =>
commandPool.freeCommandBuffer(cb)
stagingBuffer.destroy()
val pe = new PendingExecution(cb, binding.execution.fold(Seq(_), _.toSeq), cleanup)
val pe = new PendingExecution(cb, binding.execution.fold(Seq(_), _.toSeq), cleanup, s"Writing at ${name.value}:${line.value}")
addExecution(pe)
binding.execution = Left(pe)
case _ => throw new IllegalArgumentException(s"Tried to write to non-VkBinding $buffer")
Expand All @@ -72,22 +73,26 @@ class VkAllocation(commandPool: CommandPool, executionHandler: ExecutionHandler)
def apply[T <: Value: {Tag, FromExpr}](length: Int): GBuffer[T] =
VkBuffer[T](length).tap(bindings += _)

def apply[T <: Value: {Tag, FromExpr}](buff: ByteBuffer): GBuffer[T] =
def apply[T <: Value: {Tag, FromExpr}](buff: ByteBuffer)(using name: sourcecode.FileName, line: sourcecode.Line): GBuffer[T] =
val sizeOfT = typeStride(summon[Tag[T]])
val length = buff.capacity() / sizeOfT
if buff.capacity() % sizeOfT != 0 then
throw new IllegalArgumentException(s"ByteBuffer size ${buff.capacity()} is not a multiple of element size $sizeOfT")
GBuffer[T](length).tap(_.write(buff))
GBuffer[T](length).tap(_.write(buff)(using name, line))

extension (uniforms: GUniform.type)
def apply[T <: GStruct[?]: {Tag, FromExpr, GStructSchema}](buff: ByteBuffer): GUniform[T] =
GUniform[T]().tap(_.write(buff))
def apply[T <: GStruct[?]: {Tag, FromExpr, GStructSchema}](
buff: ByteBuffer,
)(using name: sourcecode.FileName, line: sourcecode.Line): GUniform[T] =
GUniform[T]().tap(_.write(buff)(using name, line))

def apply[T <: GStruct[?]: {Tag, FromExpr, GStructSchema}](): GUniform[T] =
VkUniform[T]().tap(bindings += _)

extension [Params, EL <: Layout: LayoutBinding, RL <: Layout: LayoutBinding](execution: GExecution[Params, EL, RL])
def execute(params: Params, layout: EL): RL = executionHandler.handle(execution, params, layout)
def execute(params: Params, layout: EL)(using name: sourcecode.FileName, line: sourcecode.Line): RL =
val message = s"Executing at ${name.value}:${line.value}"
executionHandler.handle(execution, params, layout, message)

private def direct[T <: GStruct[?]: {Tag, FromExpr, GStructSchema}](buff: ByteBuffer): GUniform[T] =
GUniform[T](buff)
Expand All @@ -113,6 +118,38 @@ class VkAllocation(commandPool: CommandPool, executionHandler: ExecutionHandler)
private def getStagingBuffer(size: Int): Buffer.HostBuffer =
Buffer.HostBuffer(size, VK_BUFFER_USAGE_TRANSFER_DST_BIT | VK_BUFFER_USAGE_TRANSFER_SRC_BIT)

lazy val synchroniseCommand: VkCommandBuffer = pushStack: stack =>
val commandBuffer = commandPool.createCommandBuffer()
val commandBufferBeginInfo = VkCommandBufferBeginInfo
.calloc(stack)
.sType$Default()
.flags(VK_COMMAND_BUFFER_USAGE_SIMULTANEOUS_USE_BIT)

check(vkBeginCommandBuffer(commandBuffer, commandBufferBeginInfo), "Failed to begin recording command buffer")
val memoryBarrier = VkMemoryBarrier2
.calloc(1, stack)
.sType$Default()
.srcStageMask(VK_PIPELINE_STAGE_2_COMPUTE_SHADER_BIT | VK_PIPELINE_STAGE_2_ALL_TRANSFER_BIT)
.srcAccessMask(
VK_ACCESS_2_SHADER_READ_BIT | VK_ACCESS_2_SHADER_WRITE_BIT | VK_ACCESS_TRANSFER_READ_BIT | VK_ACCESS_TRANSFER_WRITE_BIT |
VK_ACCESS_2_UNIFORM_READ_BIT,
)
.dstStageMask(VK_PIPELINE_STAGE_2_COMPUTE_SHADER_BIT | VK_PIPELINE_STAGE_2_ALL_TRANSFER_BIT)
.dstAccessMask(
VK_ACCESS_2_SHADER_READ_BIT | VK_ACCESS_2_SHADER_WRITE_BIT | VK_ACCESS_TRANSFER_READ_BIT | VK_ACCESS_TRANSFER_WRITE_BIT |
VK_ACCESS_2_UNIFORM_READ_BIT,
)

val dependencyInfo = VkDependencyInfo
.calloc(stack)
.sType$Default()
.pMemoryBarriers(memoryBarrier)

vkCmdPipelineBarrier2(commandBuffer, dependencyInfo)
check(vkEndCommandBuffer(commandBuffer), "Failed to finish recording command buffer")

commandBuffer

object VkAllocation:
private[runtime] def getUnderlying(buffer: GBinding[?]): VkBinding[?] =
buffer match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ sealed abstract class VkBinding[T <: Value: {Tag, FromExpr}](val buffer: Buffer)
*/
var execution: Either[PendingExecution, mutable.Buffer[PendingExecution]] = Right(mutable.Buffer.empty)

def materialise(queue: Queue)(using Device): Unit =
def materialise(allocation: VkAllocation)(using Device): Unit =
val (pendingExecs, runningExecs) = execution.fold(Seq(_), _.toSeq).partition(_.isPending) // TODO better handle read only executions
if pendingExecs.nonEmpty then
val fence = PendingExecution.executeAll(pendingExecs, queue)
fence.block()
PendingExecution.executeAll(pendingExecs, allocation)
pendingExecs.foreach(_.block())
PendingExecution.cleanupAll(pendingExecs)

runningExecs.foreach(_.block())
Expand Down
Loading