-
Notifications
You must be signed in to change notification settings - Fork 2.9k
[CPU] FC node tensor parallel #25088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@EgorDuplensky @dmitry-gorokhov PR is updated. Please review the PR to get it merged. |
| if (m_cfg.numSubStreams > 0) { | ||
| m_has_sub_compiled_models = true; | ||
| auto sub_cfg = m_cfg; | ||
| sub_cfg.numSubStreams = 0; | ||
| sub_cfg.enableNodeSplit = true; | ||
| auto streams_info_table = m_cfg.streamExecutorConfig.get_streams_info_table(); | ||
| auto message = message_manager(); | ||
| m_sub_memory_manager = std::make_shared<SubMemoryManager>(m_cfg.numSubStreams); | ||
| message->set_num_sub_streams(m_cfg.numSubStreams); | ||
| for (int i = 0; i < m_cfg.numSubStreams; i++) { | ||
| std::vector<std::vector<int>> sub_streams_table; | ||
| sub_streams_table.push_back(streams_info_table[i + 1]); | ||
| sub_streams_table[0][NUMBER_OF_STREAMS] = 1; | ||
| sub_cfg.streamExecutorConfig = IStreamsExecutor::Config{"CPUStreamsExecutor", | ||
| 1, | ||
| 1, | ||
| ov::hint::SchedulingCoreType::ANY_CORE, | ||
| false, | ||
| true, | ||
| sub_streams_table, | ||
| sub_cfg.streamsRankTable[i]}; | ||
| m_sub_compiled_models.push_back( | ||
| std::make_shared<CompiledModel>(model, plugin, sub_cfg, loaded_from_cache, m_sub_memory_manager)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a general comment regarding streams configuration.
First, we introduced substreams, which are not exactly the same as usual streams but have the same goal - execute something within a stream.
Now we have those internal streams, which are much closer to the 'normal' streams (at least they are created using the same interface), but still, for some reason, created separately, not in scope of the common stream creation logic / routine.
I think we need (maybe even 'must') to generalize the streams creation for all the use cases we have.
So basically, before the tensor parallel, we had two main scenarios:
- Single stream, single cpu graph, this cpu graph has no asynchronous execution inside. Only one stream is available for the user and its infer_requests.
We call this a 'latency' mode - Multiple streams, each using its own cpu graph. Those cpu graphs have no asynchronous execution inside (all the nodes are executed subsequently). All the streams are available for the user and its infer_requests.
We call this a 'throughput' mode
Basically, we need to extend this configuration and those abstractions to support two new use cases:
- Multiple streams, multiple cpu graphs, this cpu graph has no asynchronous execution inside. Only one stream is available for the user and its infer_requests, and the rest are available only for the internal plugin logic.
This is the tensor parallel solution implemented in scope of your PR - Multiple streams, single cpu graphs, this cpu graph potentially have asynchronous execution inside. Only one stream is available for the user and its infer_requests, and the rest are available only for the internal plugin logic. CPU graph have an access to all the avaliable streams.
This is an alternative tensor parallel solution, where everything is implemented as part of cpu graph.
We can formalize this as a following table:
| Mode | Total number of streams | Streams avaiable for the user | Number of cpu graphs | cpu graph streams |
|---|---|---|---|---|
| Latency w/o TP | 1 | 1 | 1 | 0 |
| Throughput | N | N | N | 0 |
| Latency TP | N | 1 | N | 0 |
| Latency TP (alt) | N | 1 | 1 | N |
| Troughput TP | M * K | M | M | K |
The last entry 'thoughput tensor parallel' is a theoretical configration we might or might not need to implement in the future. I have mentioned it just to highlight how 'flexible' the configuration should be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @EgorDuplensky, the new design of this PR introduce the stream wrapper for user and sub stream for model execution. Stream wrapper has same interface as normal stream, but without TBB arena for execution. Sub streams are totally same as normal stream.
Then your table can be updated as below:
| Mode | Total number of execution streams | Streams available for the user | Number of cpu graphs | Number of sub stream in each user stream |
|---|---|---|---|---|
| Latency w/o TP | 1 | 1 | 1 | 0 |
| Throughput | N | N | N | 0 |
| Latency TP | N | 1 | N | N |
| Latency TP (alt) | N | 1 | 1 | N |
| Troughput TP | M * K | M | M | K |
During model compilation, stream_info_table is created based on mode information, which include stream wrapper and sub streams information. Then StreamsExecutor will create stream wrapper and sub streams based on stream_info_table.
This PR is the implementation of Latency TP mode. So the change in stream_info_table and StreamsExecutor only cover Latency TP mode as well. When we create PR for Latency TP (alt), we can extend stream_info_table and StreamsExecutor to support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I didn't mean that we need to implement this in scope of the current PR, since we don't really have time to do this. I just wanted to highlight the idea.
| continue; | ||
|
|
||
| const auto element_num = ov::shape_size(weights.get_shape()); | ||
| if (element_num <= 6600000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate this magic constant and add a proper description regarding the heuristic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on discussion in Arch Forum review, default setting for TP remains disabled. So this heuristic logic will be removed in this PR.
src/plugins/intel_cpu/src/graph.cpp
Outdated
| sub_stream_id++; | ||
| } | ||
| if ((getGraphContext()->getCPUStreamExecutor()) && (getConfig().hintPerfMode == ov::hint::PerformanceMode::LATENCY)) { | ||
| subStreamID = getGraphContext()->getCPUStreamExecutor()->get_numa_node_id(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe there is no more substreams in the current solution.
Could you please isolate the work around in scope of some static function.
Could you please clarify whether is supposed to work for a normal latency mode, w/o tensor parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @EgorDuplensky, the logic of getting the numa node id of the stream and using for memory binding works for normal latency mode without TP.
| FullyConnected::FullyConnected(const std::shared_ptr<ov::Node>& op, const GraphContext::CPtr context) | ||
| : Node(op, context, FCShapeInferFactory(op)), | ||
| errorPrefix("FullyConnected node with name '" + getName() + "'") { | ||
| if (context->getCPUStreamExecutor()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please isolate all the tensor parallel related logic in scope of a separated functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please isolate all the tensor parallel related logic in scope of a separated functions
done
| } | ||
|
|
||
| ExecutorPtr FullyConnected::createExecutor() { | ||
| if (enable_tensor_parallel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
done
| {ARG_DST, dstDescs[0]}, | ||
| }; | ||
|
|
||
| if (enable_tensor_parallel && cached_scale) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
done
| // 1. weight shape is dynamic | ||
| // 2. last dim can be splited. | ||
| // 3. set 6600000 as a threshold to filter small weight | ||
| if (enable_tensor_parallel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
done
|
|
||
| void FullyConnected::fuseDecompressionMultiply(const MemoryCPtr& memory) { | ||
| attrs.decompressionMultiplyPtr = memory; | ||
| if (enable_tensor_parallel && !cached_scale) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
done
|
|
||
| void FullyConnected::fuseDecompressionSubtract(const MemoryCPtr& memory) { | ||
| attrs.decompressionSubtractPtr = memory; | ||
| if (enable_tensor_parallel && !cached_zeropoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please isolate
done
| ExecutorPtr executor = nullptr; | ||
| std::string errorPrefix; | ||
|
|
||
| // tensor parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wrap it up into some "TensorParallelConfig" structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please wrap it up into some "TensorParallelConfig" structure
Done. create a new struct for managing the tensor parallel config.
|
@EgorDuplensky Your comments are addressed. Please review again. Thanks |
EgorDuplensky
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
One more comment is expected to be addressed.
| MemoryPtr cached_splited_weight = nullptr; | ||
| MemoryPtr cached_splited_bias = nullptr; | ||
| MemoryPtr cached_scale = nullptr; | ||
| MemoryPtr cached_zeropoint = nullptr; | ||
| MemoryPtr cached_dst = nullptr; | ||
| MemoryDescPtr memory_desc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't we include this into a FCTensorParallelConfig?
It seems it is only used in the context of tensor parallel extention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't we include this into a FCTensorParallelConfig? It seems it is only used in the context of tensor parallel extention.
yes. move these to FCTensorParallelConfig.
Details:
Tickets: