Pipe Sink

Pipe Sink

I had an idea/desire for a “simple” template class that’d work as my OpenGL thread. A contemporary design for multi-threaded 3D games10 seems to be pooling work and processing it in whatever threads are available. IME OpenGL/GLFW are not re-entrant; Apple2 and GLFW3 explicitly states that it won’t work - so it seems safe to assume that I shouldn’t call functions from different threads. So to make a super-fast 3D game (or whatever) I need to do less work on the thread which is running OpenGL while allowing other threads to send it whatever work they please.

My requirements are;

  • Separate handler-thread for just OpenGL rendering
  • Other threads need to be able to submit “messages” to the handler-thread
    • a thread’s messages must/should be received FIFO by the handler-thread
    • I don’t need any guarantee of ordering between threads
  • As always, I want to minimise dynamic invocations and allocations
    • only a single virtual invocation is used per message
    • all allocation is done in-place
  • I’m perfectly happy for the handler-thread to create and destroy the GLFW window and OpenGL context
    • a HANDLER instance is used to contain logic for this thing

This feels like a threading primitive that I have seen nothing which feels this nimble before. So here goes my blog post on the subject; I’m sure that someone else has done something more sophisticated, however, my fascination with minimalism justifies the time that I spent on this implementation and write-up.

2017-08-04 ; I ran a grammar check and reworked this article to use GoogleTest.

Class Declarations

The solution presented here is composed of two class templates a single global function, and one typedef for the index type. The typedef is a bit of a flourish, and the global function is my “sleepy wait” method. The classes pipe and sink were originally “uncoupled” and pipe didn’t need the HANDLER type-name. In the future - I’d like to reintroduce that abstraction, for the time being - this is “good enough” to proceed.

The pipe class contains the buffer that the messages are put into, along with the necessary counters of type pipe_sink_index_t to track usage. The actual pipe itself is a logical interface, and only exposes the send() method to client code. The storage and processing mechanisms for messages are hidden from the MESSAGE and HANDLER classes. Vardric template parameters and r-value references are used to ameliorate the invocation overhead. This leads to an elaborate send() method which is detailed separately below.

The sink class is the “entry point” and creates the pipe for messages and an instance of std::thread to process the messages. The instance of the HANDLER template parameter is created inside of the thread to provide the “meat” of processing messages. In my design - the HANDLER creates and destroys the GLFW window as well as initialising GL function pointers. Vardric template parameters are again used in an attempt to minimise the cost of these methods, however, I’ve not yet found a way to move the parameters into the thread’s lambda. This is all done on the handler-thread, making the sink constructor and destructor complex enough to (also) warrant a separate discussion.

Two further methods are available to ease the use - in both cases these fall under the “porcalin” rather than “plumbing” taxonomy. On occaision (such as durring an automated test) a thread will wish to wait for a pipe to empty - the pipe::join() method accompolishes this. Rarely, it may be convient to access the HANDLER instance from another thread - to support this the sink::-> operator is overloaded to return a pointer. These functios could be provided by client code - so the rest of this document effectively ignores them.

#pragma once

// V$2013 doesn't have `noexcept` so this works around that
// ... and I expect other things will be missing too
#if !defined(_MSC_VER ) || (_MSC_VER >= 1900)
#    ifndef pal_noexcept
#        define pal_noexcept noexcept
#    endif
#else
#    ifndef pal_noexcept
#        define pal_noexcept
#    endif
#endif

#include <thread>
#include <mutex>
#include <stdint.h>

namespace pal
{
	/// stupid, icky, easy, safe
	void pipe_sink_sleep(void) pal_noexcept;

	/// assumed to be unsigned - but you won't have to interact with it
	typedef uint16_t pipe_sink_index_t;

	// class-prototype - needed to "friend" the class in pipe
	template<class HANDLER, size_t SIZE>
	class sink;

	/// pipe class
	template<class HANDLER, size_t SIZE>
	class pipe final
	{
		static_assert (SIZE < (1 << (8 * sizeof(uint16_t))), "Ensure that pipe_sink_index_t can address all positions in the buffer");

		friend class sink < HANDLER, SIZE >;

		// the guard for this pipe
		std::mutex _guard;

		// write marker for the next message
		pipe_sink_index_t _write = 0;

		// ready marker for the newest message that can be processed
		pipe_sink_index_t _ready = 0;

		// read counter for the next message
		pipe_sink_index_t _read = 0;

		// storage for the messages
		uint8_t _buffer[SIZE];

		// pipe should never be created or destroyed outside of sink
		pipe(void) pal_noexcept {}

		// pipe should never be created or destroyed outside of sink
		~pipe(void) pal_noexcept {}

		// pipe should not be copied
		pipe(const pipe&) = delete;

		// pipe should not be copied
		pipe& operator=(const pipe&) = delete;

		// wait until we can grab some bytes
		template<size_t LENGTH>
		pipe_sink_index_t reserve_space(void) pal_noexcept;

		// mark the bytes we previously grabbed as ready
		template<size_t LENGTH>
		void ready_message(const pipe_sink_index_t) pal_noexcept;
	public:
		/// big main cool method that sends a message and returns when it's queued
		template<class MESSAGE, typename ...ARGS>
		void send(ARGS&& ... args) pal_noexcept;

		void join(void)
		{
			while (true)
			{
				std::lock_guard<std::mutex> lock(_guard);

				if (_read == _ready)
					return;

				pal::pipe_sink_sleep();
			}
		}
	};


	/// sink that processes messages sent along pipe
	template<class HANDLER, size_t SIZE>
	class sink
	{
		std::thread _thread;
		HANDLER* _instance;

		// variable to mark if we're done or not
		// ... could be an atomic bool
		bool _done;
	public:

		/// creates the thread and instance before returning
		template<typename ... ARGS>
		sink(ARGS ... args) pal_noexcept;

		/// send your messages with this!
		pal::pipe<HANDLER, SIZE> _pipe;

		HANDLER* operator->(void)
		{
			return _instance;
		}

		/// processes all messages before shutting down the thread and returning
		/// ... don't try to send new messages while shutting down ... that might cause weird problems
		~sink(void) pal_noexcept
		{
			_done = true;
			_thread.join();
		}
	};
}

#ifdef pal_cpp

#include <chrono>

pal_cpp void pal::pipe_sink_sleep(void) pal_noexcept
{
	std::this_thread::sleep_for(std::chrono::nanoseconds(830314));
}

#endif

Sink Constructor

The sink constructor needs to start the processing thread, pass it some arguments, and ensure that it has created the HANDLER before returning. The parameters for the instance need to be “used” before the stack is reclaimed - which is why the meticulous handling is required. I was unable to find a cocktail of captures and arguments that initialised HANDLER with pass-by-move; so the constructor performs copy operations for the time being.4 In my design - this will only happen during startup so (should be) negligible. It’d be a nice thing to sus out over a coffee or a whiskey someday.

With the receiver constructed, the thread marks itself as done and enters the “main” loop that it will execute until it’s time to conclude. The first step of the loop is to check for either waiting data or an exit signal - and either sleep or return as appropriate. When data is “ready” we can retrieve the callback - remembering that _buffer + _read is a pointer to the function pointer.5 The message should be allocated immediately after in memory this could be a cache alignment issue on non-traditional platforms. The handling is performed by a function defined in the send() method which is described elsewhere. The handling function leaves the std::mutex locked; unlocking it would lead to a need to immediately re-lock it. At this point, the pipe is checked and reset (if possible) before execution loops and checks for an available message.

#pragma once

template<class HANDLER, size_t SIZE>
template<typename ...ARGS>
inline pal::sink<HANDLER, SIZE>::sink(ARGS ... args) pal_noexcept :
	_instance(nullptr)
{
	typedef void(*callback_f)(pal::pipe<HANDLER, SIZE>&, void*, HANDLER&);

	struct nested
	{
		static void thread(pal::sink<HANDLER, SIZE>* self, ARGS&&...args) pal_noexcept
		{
			// consume the arguments and create the handler
			// ... doing it in the heap to avoid problems with really-big queue sizes
			self->_instance = new HANDLER(args...);
			HANDLER& handler = *(self->_instance);

			// okay - signal to the constructor that we're done and he can tear-up the stack (we're not looking at the args anymore)
			self->_done = false;

			// we'll explicitly break from this loop
			self->_pipe._guard.lock();
			while (true)
			{
				// wait for some data to be in the pipe
				while (self->_pipe._read == self->_pipe._ready)
				{
					// unlock before sleeping or returning
					self->_pipe._guard.unlock();

					if (self->_done)
					{
						delete self->_instance;
						return;
					}

					// sleep for a tick and see if the pipe is refilled
					pal::pipe_sink_sleep();
					self->_pipe._guard.lock();
				}

				// unlock before processing a message
				self->_pipe._guard.unlock();

				// the "head" is a function pointer which we can reinterpret_cast like this
				auto callback = reinterpret_cast<callback_f*>(self->_pipe._buffer + self->_pipe._read);

				// the second parameter is the address which starts after the first
				auto content = callback + 1;

				// just call it like this
				(*callback)(self->_pipe, content, handler);

				// ... which leaves the mutex locked and ready to ...

				// reset if appropriate
				if (self->_pipe._read == self->_pipe._ready && self->_pipe._ready == self->_pipe._write)
					self->_pipe._read = self->_pipe._ready = self->_pipe._write = 0;
			}

		}
	};

	// flag that the thread isn't running
	_done = true;

	// launch our thread (then wait for it to finish startup)
	_thread = std::thread(nested::thread, this, args...);

	// wait for the thread to start up
	// ... because we don't want to tear down our ARGS until the thread has consumed them
	while (_done)
	{
		pal::pipe_sink_sleep();
	}
}

Send Message

When a message is sent, it needs some code tailored to the MESSAGE class that forms the message data. The conventional wisdom is to use a virtual method; however, this left a problem of needing to retrieve a pointer to an inherited instance. I chose to use a nested function (a static function in a nested class) which will be redefined for every invocation to match the MESSAGE class.6 Sending messages is complicated by the fact that the associated reading code needs to work without knowing what it’s looking at. To work around this, function pointers are used instead ofvirtual classes7 and these are declared as static members of a nested class to avoid confusion. The nested function itself simply invokes an unknown static method (with the passed HANDLER) before explicitly destroying the message object and advancing the markers. The std::mutex is left locked - which the sink worker thread expects.

Before a message can be written, space must be reserved for it and the callback function; the writer-thread simply waits for there to be enough available space. Once space is ready, the nested function pointer is written, the message itself is instantiated in-place before the data itself is marked as “ready” for the sink to process. The last step involves “waiting” for any preceding messages to finish up, consider the following chronological sequence

  1. Message A (in writer-thread X) reserves space at the head of the pipe and begins writing
  2. Message B (in writer-thread Y) reserves space after A and begins writing itself
  3. Message B finished, but, must wait to move the _ready counter up until Message A is done

Once the _ready counter has been moved up, the message has been sent and the writer-thread returns as it’s done with it. Sleepy-waits are again used here; it is the least attractive solution as there are obvious scenarios where the whole thing will hang. Overall - it feels like the area of the class that would benefit the most from the use of std::condition_variable objects.

#pragma once

template<class HANDLER, size_t SIZE>
template<class MESSAGE, typename ... ARGS>
void pal::pipe<HANDLER, SIZE>::send(ARGS&& ... args) pal_noexcept
{
	typedef void(*callback_f)(pal::pipe<HANDLER, SIZE>&, void*, HANDLER&);
	typedef void(*function_f)(pal::pipe<HANDLER, SIZE>*, MESSAGE*, HANDLER&);

	enum size : size_t
	{
		payload = sizeof(function_f) + sizeof(MESSAGE),
	};

	// declare an olde-school static function
	struct nested
	{
		static void function(pal::pipe<HANDLER, SIZE>* pipe, MESSAGE* message, HANDLER& data)
		{
			// run the payload's method
			MESSAGE::apply(data, message);

			// delete the event
			message->~MESSAGE();

			// ... clean it up ...

			// lock the mutex (and leave it so)
			pipe->_guard.lock();

			// advance the read marker
			pipe->_read += size::payload;
		}

	};

	// reserve space for the message
	const auto start = reserve_space<size::payload>();
	static_assert(size::payload <= SIZE, "SAN ; messages must be able to fully fit into buffer");
	static_assert(sizeof(function_f) == sizeof(callback_f), "SAN ; function pointers should be the same size");

	// assign the function
	function_f* pointer = reinterpret_cast<function_f*>(_buffer + start);
	*pointer = nested::function;

	// create the message
	void* content = pointer + 1;
	new (content)MESSAGE(args...);

	// ready the message
	ready_message<size::payload>(start);
}

template<class HANDLER, size_t SIZE>
template<size_t LENGTH>
pal::pipe_sink_index_t pal::pipe<HANDLER, SIZE>::reserve_space(void) pal_noexcept
{
	// lock the pipe and wait for available space
	_guard.lock();
	while ((SIZE - _write) < LENGTH)
	{
		// unlock, wait, relock
		_guard.unlock();
		pal::pipe_sink_sleep();
		_guard.lock();
	}

	// record the space and mark it as used
	const auto start = _write;
	_write += LENGTH;

	// we have a result
	_guard.unlock();
	return start;
}

template<class HANDLER, size_t SIZE>
template<size_t LENGTH>
void pal::pipe<HANDLER, SIZE>::ready_message(const pal::pipe_sink_index_t start) pal_noexcept
{
	// lock the pipe and wait for everyone before us to be ready
	_guard.lock();
	while (_ready != start)
	{
		// unlock, wait, relock
		_guard.unlock();
		pal::pipe_sink_sleep();
		_guard.lock();
	}

	// advance and unlock
	_ready += LENGTH;
	_guard.unlock();
}

Thoughts

The implementation demonstrates some subtle characteristics and shortcomings. Hopefully, the characteristics foster good development practices while providing an efficient implementation of the desired functionality. Ideally, the shortcomings could be corrected without changing the API.

Lack of Error Paths

“Simplicity is prerequisite for reliability.”

Error handling is a bugbear for programmers since, by its nature, it involves the software following unanticipated logic. Fundamentally, this is usually caused by;

  • inputs from an upper layer outside of the expected ranges
  • outputs from a lower layer that defy expectations

Since this system functions as a “pure” and opaque IoC - it should blindly pass whatever data it is given along to client code as instructed. Most sort of errors would either be detected at compile-time or, would be outside the realm of what this software could be expected to test. Meaningful logic is delegated to client code, the software shouldn’t be able to “err” without exotic8 scenarios. The obvious contradiction to this is the fact that if the processing of a message sends a message, then the system will lockup. Given the fact that it’s a stated limitation, error checking for it seems as meaningful as checks for abnormal thread termination.

Templates and Allocation

The use of templates trades (usually irrelevant)9 compiled footprint size for a nimble framework to write a sort of parallel messaging system with. In effect, the system doesn’t care what classes are used for messages and what’s used for the handler at the end - as long as compatible calls can be made it should “just work.” Since the messages are allocated with in-place allocation, they should be no more complex than copying the parameters. There’s a caveat in all this - compilers don’t always behave themselves and the rushed nature of this development means that the software hasn’t been evaluated on a large number of systems. Going forward - I’d like to post this source-code publicly and assess it, and other tools, with something like TravisCI and SonarQube across a variety of platforms. The time required to get to grips with a new CI tool and transplant my build configuration is the only real barrier here.

Pass by RValue Reference

One of the more interesting C++11 features was the option to pass by “rvalue reference” which (presumably) reduces the overhead in function calls. Between this and the variadic templates (also introduced in C++11), the send() method can be almost-as-good-as-by-hand without sacrificing the cohesion of a sealed class. I was (frustratingly) unable to find a cocktail of closures and capture symbols that would allow the HANDLER to be constructed in this manner. With more time, it would be nice to sit with a coffee and sus that out.

True Circular Pipe

The pipe is not circular; in a scenario when the queue does not have enough space a writer-thread will wait until it can fit the whole message into the queue. Implementing a true-circular buffer would (potentially) reduce the likelihood (frequency) of writer-threads needing to wait for space to be available. This wouldn’t be a magic bullet, however - with numerous small messages, it’s likely that a single large message could be blocked indefinitely.

Since posting this, I’ve been made aware of a (non-magical) ring-buffer.

Non-Sleepy Wait / Conditional Variables / Priority Correction

The use of “sleepy-wait” to synchronize is crude and should be avoided. The mechanism results in threads either sleeping too long - ignoring perfectly usable data or sleeping to shortly - occupying the CPU by pestering it with “iz ready now?” checks. The standard conditional variable10 approach would allow threads to “suspend” until something to change. Both approaches (however) suffer from a distortion of thread priority - a lower priority thread could easily wait for a higher priority one. The sleepy-wait was chosen due to its simplicity, however, for the purpose of this article.

Cache Alignment

Cache alignment is a “hot topic” in performance optimisation - but somewhat irrelevant here. When data is used by the CPU it must be moved from main memory into the CPU’s cache, generally in 64 byte11 12 13 “lines” which are pulled from main memory, into the cache, all at once. By aligning data structures so that they are spread across as few CPU cache lines as possible,14 the likelihood of a cache miss is decreased and the CPU is less likely to stall. This poses an odd challenge, since, the static_assert checks related to message size would become more complicated.

Data written to the queue will have a performance characteristic unknown to the author. Data is read from the queue, however, will be carried out sequentially. Since data is “always” used - the problem of wasted cycles pulling cache lines into the CPU seems (largely) irrelevant. Any cache line pulled in to hold the tail of one message will, likely, contain data for the next message - and when starting a message, it’s head will likely have been cached by the message in front of it.

Applicability

This was written for an OpenGL renderer, past iterations of this worked well enough that I could submit GL calls faster than the system could process them. During development, I read a passage about a parallel renderer15 that would benefit from being able to split jobs amongst various threads and relying upon the HANDLER to merge them at the end. This scenario would likely involve multiple instances of pipe-sink talking to each other, but, seems interesting to explore as I go on with this.

Overall; my goal was a convenient type class to help parallelize rendering work - I feel that this is a success.

#include <util/pal.hpp>

#include <gtest/gtest.h>

struct accumulator
{
	int _val;
	const char* _prefix;
	accumulator(const char* prefix, int start) pal_noexcept :
	_prefix(prefix),
		_val(start)
	{
	}

	~accumulator(void)
	{
	}

};

TEST(pipe_sink, test_creation)
{
	pal::sink<accumulator, 128> test_sink("test_sink", -1);


	ASSERT_EQ(-1, test_sink->_val);
}

TEST(pipe_sink, test_sending)
{
	pal::sink<accumulator, 128> test_send("test_send", -1);

	struct addone
	{
		const double sentinel = 3.14;
		int _v;
		addone(int v) : _v(v)
		{
		}

		static void apply(accumulator& them, addone* self)
		{
			assert(3.14 == self->sentinel);

			them._val += self->_v;
		}

		~addone(void)
		{
			assert(3.14 == this->sentinel);

		}
	};

	test_send._pipe.send<addone>(1);

	test_send._pipe.join();
	ASSERT_EQ(0, test_send->_val);

	test_send._pipe.send<addone>(2);
	test_send._pipe.send<addone>(3);

	test_send._pipe.join();
	ASSERT_EQ(5, test_send->_val);

	struct checkis
	{
		const int _value;
		checkis(int value) : _value(value)
		{
		}

		static void apply(accumulator& them, checkis* self)
		{
			ASSERT_EQ(self->_value, them._val) << "FAILED\n\te=" << self->_value << "\n\ta=" << them._val << std::endl;
		}
	};
	test_send._pipe.send<checkis>(5);
	test_send._pipe.send<addone>(4);
	test_send._pipe.send<checkis>(9);

}

  1. https://en.wikipedia.org/wiki/4A_Engine [return]
  2. https://developer.apple.com/library/content/documentation/GraphicsImaging/Conceptual/OpenGL-MacProgGuide/opengl_threading/opengl_threading.html [return]
  3. http://www.glfw.org/docs/latest/intro_guide.html#reentrancy [return]
  4. I haven’t tried this since switching to a nested class rather than a lambda [return]
  5. This threw me and was hard to debug [return]
  6. As would a nested subclass. No savings there. [return]
  7. Virtual classes would have an unknown offset. While this could [return]
  8. Threads can be closed by the system or user code unexpectedly - that would kill it [return]
  9. Competitive programming “DemoScene” uses a small code-footprint to demonstrate highly detailed graphics scenes. Since templates are generally inlaid, I would expect that they have a firm impact on code-footprint. [return]
  10. https://en.wikipedia.org/wiki/Monitor_(synchronization)#Condition_variables_2 [return]
  11. https://course.ccs.neu.edu/com3200/ [return]
  12. https://stackoverflow.com/questions/7281699/aligning-to-cache-line-and-knowing-the-cache-line-size#7281770 [return]
  13. http://lemire.me/blog/2012/05/31/data-alignment-for-speed-myth-or-reality/ [return]
  14. Generally, this is done by allocating mor space than needed and ignoring the extra space at the start of the allocated block. [return]
  15. Akenine-Moller and Haines ; Real-Time Rendering Second Edition, Chapter 10.5.1 [return]
Peter LaValle avatar
About Peter LaValle
honks at geese. speaks the byzantine languages that make the magic boxes do the things. Someday maybe I'll retire and be a graphics programmer or demoscene coder.
comments powered by Disqus