• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

concurrentqueue源码阅读

武飞扬头像
山有木兮啊
帮助1

1、ConcurrentQueue构造

template <typename T, typename Traits = ConcurrentQueueDefaultTraits>
class ConcurrentQueue;
// 三种构造函数:两个有参一个移动构造,禁止拷贝构造和赋值
// 有参构造函数的步骤大同小异,都是对哈希和链表(变量名叫list,但是是连续的内存)初始化,哈希的每个key在
// populate_initial_implicit_producer_hash()函数内都初始化为0
// 在populate_initial_block_list中先申请堆内存,然后再用placement new对每个内存进行初始化
explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
			: producerListTail(nullptr),
			  producerCount(0),
			  initialBlockPoolIndex(0),
			  nextExplicitConsumerId(0),
			  globalExplicitConsumerOffset(0)
{
	implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
	populate_initial_implicit_producer_hash();
	populate_initial_block_list(capacity / BLOCK_SIZE   ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
}

// 
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
			: producerListTail(nullptr),
			  producerCount(0),
			  initialBlockPoolIndex(0),
			  nextExplicitConsumerId(0),
			  globalExplicitConsumerOffset(0)
{
	implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
	populate_initial_implicit_producer_hash();
	size_t blocks = (((minCapacity   BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers   1)   2 * (maxExplicitProducers   maxImplicitProducers);
	populate_initial_block_list(blocks);
}
学新通

2、enqueue压入数据

// 压入大同小异,只分析一种
inline bool enqueue(T const &item)
{
	MOODYCAMEL_CONSTEXPR_IF(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0)
	return false; // 直接看inner_enqueue()
	else return inner_enqueue<CanAlloc>(item);
}

inline bool inner_enqueue(U &&element)
{
	auto producer = get_or_add_implicit_producer();
	return producer == nullptr ? false : 
		producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
	// producer是一个容器,真正将元素压到队列中是enqueue函数
}

// 压入的重点在get_or_add_implicit_producer()
// 获取值存放的内存
ImplicitProducer *get_or_add_implicit_producer()
{
	auto id = details::thread_id(); // 固定值,同一个线程id相同
	auto hashedId = details::hash_thread_id(id); // 经过计算后的hash值也相同
	/*
		implicitProducerHash是一个原子对象,存放的是ImplicitProducerHash的地址值
		ImplicitProducerHash是一个结构体,成员prev指向下一个ImplicitProducerHash
		每个ImplicitProducerHash都有一个hash数组,
		一下就是遍历链表每个节点,找到与id相同的,然后再将value存放到哈希表,
		如果已经存满会跳出循环,然后对哈希表进行扩容,每次扩容会将容量乘2
	*/
	auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
	assert(mainHash != nullptr); // silence clang-tidy and MSVC warnings (hash cannot be null)
	for (auto hash = mainHash; hash != nullptr; hash = hash->prev)
	{
		// Look for the id in this hash
		auto index = hashedId;
		while (true)
		{ // Not an infinite loop because at least one slot is free in the hash table
			index &= hash->capacity - 1u;

			auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
			if (probedKey == id)
			{
				// value类型为ImplicitProducer,此处是想复用之前创建的value
				// 当扩容时会在hash->prev新增一块内存,之前的内存(sizeof(ImplicitProducerKVP) * newCapacity)不会释放,释放操作在析构函数,会一层一层释放
				// 所以为了减少申请内存次数,需要将之前的ImplicitProducer放到mainHash中
				// 在此不用加锁的原因是hashedId不一样,不同线程不会访问到相同数据
				auto value = hash->entries[index].value;
				if (hash != mainHash)
				{
					index = hashedId;
					while (true)
					{
						index &= mainHash->capacity - 1u;
						auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
						auto reusable = details::invalid_thread_id2;
						// 当此处内存未被使用时key会被赋值为id,并将value赋值
						if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed) ||
							mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed))
#else
						if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed))
						
#endif
						{
							mainHash->entries[index].value = value;
							break;
						}
						  index;
					}
				}

				return value;
			}
			if (probedKey == details::invalid_thread_id)
			{
				break; // Not in this hash table
			}
			  index;
		}
	}

	// Insert!
	auto newCount = 1   implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
	while (true)
	{
		// NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
		// 如果newCount大于mainHash容量的二分之一,并且获取到锁
		if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire))
		{
			// We've acquired the resize lock, try to allocate a bigger hash table.
			// Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
			// we reload implicitProducerHash it must be the most recent version (it only gets changed within this
			// locked block).
			// 重新加载一遍mainHash,有可能在其他线程被更新,所以在获取到锁后需更新一遍mainHash
			mainHash = implicitProducerHash.load(std::memory_order_acquire);
			if (newCount >= (mainHash->capacity >> 1))
			{
				// 如果更新后newCount还是大于mainHash容量的二分之一,则进行扩容
				size_t newCapacity = mainHash->capacity << 1;
				while (newCount >= (newCapacity >> 1))
				{
					newCapacity <<= 1;
				} // 将mainHash扩容至最接近且大于newCount的2的幂次方
				auto raw = static_cast<char *>((Traits::malloc)(sizeof(ImplicitProducerHash)   std::alignment_of<ImplicitProducerKVP>::value - 1   sizeof(ImplicitProducerKVP) * newCapacity));
				if (raw == nullptr)
				{
					// Allocation failed
					implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
					implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
					return nullptr;
				}
				
				// 对newHash进行初始化并将newHash更新为mainHash
				auto newHash = new (raw) ImplicitProducerHash;
				newHash->capacity = static_cast<size_t>(newCapacity);
				// align_for计算内存对齐,并找到合适的位置返回,在上面多申请了(内存对齐数-1)
				// 因为下面是使用placement new进行初始化,所以当内存不对齐时会使效率变低或出错
				newHash->entries = reinterpret_cast<ImplicitProducerKVP *>(details::align_for<ImplicitProducerKVP>(raw   sizeof(ImplicitProducerHash)));
				for (size_t i = 0; i != newCapacity;   i)
				{
					new (newHash->entries   i) ImplicitProducerKVP;
					newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
				}
				newHash->prev = mainHash;
				implicitProducerHash.store(newHash, std::memory_order_release);
				implicitProducerHashResizeInProgress.clear(std::memory_order_release);
				mainHash = newHash;
			}
			else
			{
				implicitProducerHashResizeInProgress.clear(std::memory_order_release);
			}
		}

		// If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
		// to finish being allocated by another thread (and if we just finished allocating above, the condition will
		// always be true)
		/**
		 * 如果newCount小于mainHash->capacity的四分之三,则在mainHash插入
		 * recycle_or_create_producer作用是先从producerListTail找到可用的ProducerBase
		 * 如果没有找到则会创建ptr,并将ptr原子性的替换为producerListTail的头节点
		 * 由于producerListTail在构造函数中初始化为nullptr,故第一次插入数据一定会创建节点,
		 * producerListTail节点的增加即在此函数中
		 */
		if (newCount < (mainHash->capacity >> 1)   (mainHash->capacity >> 2))
		{
			auto producer = static_cast<ImplicitProducer *>(recycle_or_create_producer(false));
			if (producer == nullptr)
			{
				implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
				return nullptr;
			}

#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
			producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
			producer->threadExitListener.userData = producer;
			details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
#endif

			auto index = hashedId;
			while (true) // 找到适合插入的位置并返回
			{
				index &= mainHash->capacity - 1u;
				auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
				auto reusable = details::invalid_thread_id2;
				if (mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed))
				{
					implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed); // already counted as a used slot
					mainHash->entries[index].value = producer;
					break;
				}
#endif
				if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed))
				{
					mainHash->entries[index].value = producer;
					break;
				}
				  index;
			}
			return producer;
		}

		// Hmm, the old hash is quite full and somebody else is busy allocating a new one.
		// We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
		// we try to allocate ourselves).
		mainHash = implicitProducerHash.load(std::memory_order_acquire);
	}
}
学新通
template <AllocationMode allocMode, typename U>
inline bool enqueue(U &&element)
{
	index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
	index_t newTailIndex = 1   currentTailIndex;
	if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0)
	{
		// We reached the end of a block, start a new one
		auto head = this->headIndex.load(std::memory_order_relaxed);
		assert(!details::circular_less_than<index_t>(currentTailIndex, head));
		if (!details::circular_less_than<index_t>(head, currentTailIndex   BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head)))
		{
			return false;
		}
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
		debug::DebugLock lock(mutex);
#endif
		// Find out where we'll be inserting this block in the block index
		BlockIndexEntry *idxEntry;
		if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex))
		{
			return false;
		}

		// Get ahold of a new block
		auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
		if (newBlock == nullptr)
		{
			rewind_block_index_tail();
			idxEntry->value.store(nullptr, std::memory_order_relaxed);
			return false;
		}
#ifdef MCDBGQ_TRACKMEM
		newBlock->owner = this;
#endif
		newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();

		MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T *>(nullptr)) T(std::forward<U>(element))))
		{
			// May throw, try to insert now before we publish the fact that we have this new block
			MOODYCAMEL_TRY
			{
				new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
			}
			MOODYCAMEL_CATCH(...)
			{
				rewind_block_index_tail();
				idxEntry->value.store(nullptr, std::memory_order_relaxed);
				this->parent->add_block_to_free_list(newBlock);
				MOODYCAMEL_RETHROW;
			}
		}

		// Insert the new block into the index
		idxEntry->value.store(newBlock, std::memory_order_relaxed);

		this->tailBlock = newBlock;

		MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T *>(nullptr)) T(std::forward<U>(element))))
		{
			this->tailIndex.store(newTailIndex, std::memory_order_release);
			return true;
		}
	}

	// Enqueue
	new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));

	this->tailIndex.store(newTailIndex, std::memory_order_release);
	return true;
}
学新通

3、try_dequeue取出数据

template <typename U>
bool try_dequeue(U &item)
{
	// Instead of simply trying each producer in turn (which could cause needless contention on the first
	// producer), we score them heuristically.
	size_t nonEmptyCount = 0;
	ProducerBase *best = nullptr;
	size_t bestSize = 0;
	for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod())
	{
		auto size = ptr->size_approx();
		if (size > 0)
		{
			if (size > bestSize)
			{
				bestSize = size;
				best = ptr;
			}
			  nonEmptyCount;
		}
	}

	// If there was at least one non-empty queue but it appears empty at the time
	// we try to dequeue from it, we need to make sure every queue's been tried
	if (nonEmptyCount > 0)
	{
		if ((details::likely)(best->dequeue(item)))
		{
			return true;
		}
		for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod())
		{
			if (ptr != best && ptr->dequeue(item))
			{
				return true;
			}
		}
	}
	return false;
}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgfggjg
系列文章
更多 icon
同类精品
更多 icon
继续加载