Skip to content

Commit a30d865

Browse files
authored
Add a pre-flight check when pe_count > cpu_set.weight() (#547)
* Replace the `DCHECK_LE` in the `ReusableFiberEngineFactory` class with an if/throw. * Update `~EdgeHolder` to `LOG(DFATAL)`. The thinking here is that this error is only of use to MRC developers and not of use to downstream users (Borrowed from PR #434). Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Will Killian (https://github.com/willkill07) URL: #547
1 parent 46f92ca commit a30d865

File tree

10 files changed

+176
-9
lines changed

10 files changed

+176
-9
lines changed

.github/workflows/ci_pipe.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ jobs:
201201

202202
codecov:
203203
name: Code Coverage
204-
runs-on: linux-amd64-gpu-v100-latest-1
204+
runs-on: linux-amd64-gpu-l4-latest-1
205205
timeout-minutes: 60
206206
container:
207207
credentials:

cpp/mrc/include/mrc/edge/edge_holder.hpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <memory>
3838
#include <mutex>
3939
#include <optional>
40+
#include <sstream> // for std::stringstream
4041
#include <stdexcept>
4142
#include <type_traits>
4243
#include <typeindex>
@@ -58,12 +59,37 @@ class EdgeHolder
5859

5960
if (this->check_active_connection(false))
6061
{
61-
LOG(FATAL) << "A node was destructed which still had dependent connections. Nodes must be kept alive while "
62-
"dependent connections are still active";
62+
std::stringstream msg;
63+
msg << "EdgeHolder(" << this << ") "
64+
<< "A node was destructed which still had dependent connections. Nodes must be kept alive while "
65+
"dependent connections are still active\n"
66+
<< this->connection_info();
67+
68+
#if defined(NDEBUG)
69+
LOG(ERROR) << msg.str();
70+
#else
71+
LOG(FATAL) << msg.str();
72+
#endif
6373
}
6474
}
6575

6676
protected:
77+
std::string connection_info() const
78+
{
79+
std::stringstream ss;
80+
ss << "m_owned_edge=" << m_owned_edge.lock() << "\tm_owned_edge_lifetime=" << m_owned_edge_lifetime
81+
<< "\tm_connected_edge=" << m_connected_edge;
82+
83+
bool is_connected = false;
84+
if (m_connected_edge)
85+
{
86+
is_connected = m_connected_edge->is_connected();
87+
}
88+
89+
ss << "\tis_connected=" << is_connected << "\tcheck_active_connection=" << this->check_active_connection(false);
90+
return ss.str();
91+
}
92+
6793
bool check_active_connection(bool do_throw = true) const
6894
{
6995
// Alive connection exists when the lock is true, lifetime is false or a connction object has been set

cpp/mrc/src/internal/system/device_info.hpp renamed to cpp/mrc/include/mrc/system/device_info.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct DeviceInfo
4545
static auto PowerLimit(unsigned int device_id) -> double;
4646
static auto PowerUsage(unsigned int device_id) -> double;
4747
static auto UUID(unsigned int device_id) -> std::string;
48+
static void Reset();
4849
// NOLINTEND(readability-identifier-naming)
4950
};
5051

cpp/mrc/src/internal/runnable/engine_factory.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,10 @@ class ReusableFiberEngineFactory final : public FiberEngineFactory
8383

8484
std::vector<std::reference_wrapper<core::FiberTaskQueue>> get_next_n_queues(std::size_t count) final
8585
{
86-
DCHECK_LE(count, m_pool.thread_count());
86+
if (count > m_pool.thread_count())
87+
{
88+
throw exceptions::MrcRuntimeError("more dedicated threads/cores than available");
89+
}
8790
std::vector<std::reference_wrapper<core::FiberTaskQueue>> queues;
8891

8992
for (int i = 0; i < count; ++i)
@@ -193,6 +196,11 @@ class ReusableThreadEngineFactory final : public ThreadEngineFactory
193196
protected:
194197
CpuSet get_next_n_cpus(std::size_t count) final
195198
{
199+
if (count > this->cpu_set().weight())
200+
{
201+
throw exceptions::MrcRuntimeError("more dedicated threads/cores than available");
202+
}
203+
196204
CpuSet cpu_set;
197205
for (int i = 0; i < count; ++i)
198206
{

cpp/mrc/src/internal/system/device_info.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
#include "internal/system/device_info.hpp"
18+
#include "mrc/system/device_info.hpp"
1919

2020
#include "mrc/cuda/common.hpp"
2121

@@ -33,6 +33,7 @@
3333
#include <cstddef>
3434
#include <cstdio>
3535
#include <memory>
36+
#include <new> // for new
3637
#include <ostream>
3738
#include <set>
3839
#include <stdexcept>
@@ -255,6 +256,13 @@ struct NvmlState
255256
return NvmlState::instance().get_handle();
256257
}
257258

259+
static void reset()
260+
{
261+
auto& state = NvmlState::instance();
262+
state.~NvmlState();
263+
new (&state) NvmlState();
264+
}
265+
258266
private:
259267
// this object can also hold the list of device handles that we have access to.
260268
// - nvmlDeviceGetCount_v2 - will tell us the total number of devices we have access to, i.e. the range of [0, N)
@@ -396,4 +404,9 @@ std::string DeviceInfo::UUID(unsigned int device_id)
396404
return buffer.data();
397405
}
398406

407+
void DeviceInfo::Reset()
408+
{
409+
NvmlState::reset();
410+
}
411+
399412
} // namespace mrc::system

cpp/mrc/src/internal/system/topology.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
#include "internal/system/topology.hpp"
1919

20-
#include "internal/system/device_info.hpp"
2120
#include "internal/utils/ranges.hpp"
2221

2322
#include "mrc/core/bitmap.hpp"
2423
#include "mrc/cuda/common.hpp"
2524
#include "mrc/exceptions/runtime_error.hpp"
2625
#include "mrc/options/topology.hpp"
26+
#include "mrc/system/device_info.hpp"
2727

2828
#include <cuda_runtime.h>
2929
#include <glog/logging.h>

cpp/mrc/src/tests/test_topology.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
* limitations under the License.
1616
*/
1717

18-
#include "internal/system/device_info.hpp"
1918
#include "internal/system/gpu_info.hpp"
2019
#include "internal/system/topology.hpp"
2120

2221
#include "mrc/core/bitmap.hpp"
2322
#include "mrc/options/topology.hpp"
23+
#include "mrc/system/device_info.hpp"
2424

2525
#include <glog/logging.h>
2626
#include <gtest/gtest.h>

cpp/mrc/tests/test_edges.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ namespace mrc {
5353
TEST_F(TestEdgesDeathTest, NodeDestroyedBeforeEdge)
5454
{
5555
// Reset the sink before the source which will cause an exception
56-
EXPECT_DEATH(
56+
EXPECT_DEBUG_DEATH(
5757
{
5858
auto source = std::make_shared<node::TestSource<int>>();
5959
auto sink = std::make_shared<node::TestSink<int>>();
6060

6161
mrc::make_edge(*source, *sink);
6262
sink.reset();
6363
},
64-
"");
64+
"A node was destructed which still had dependent connections");
6565
}
6666

6767
TEST_F(TestEdges, SourceToSink)

cpp/mrc/tests/test_mrc.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ namespace mrc {
4040
#define SKIP_IF_CODE_COV()
4141
#endif
4242

43+
// Similar to EXPECT_DEBUG_DEATH, this macro is used when a given statement is expected to abort in debug mode,
44+
// but throw an exception in release
45+
#ifdef NDEBUG
46+
#define EXPECT_DEATH_OR_THROW(statement, regex, exception) EXPECT_THROW(statement, exception);
47+
#else
48+
#define EXPECT_DEATH_OR_THROW(statement, regex, exception) EXPECT_DEATH(statement, regex);
49+
#endif
50+
4351
// class that records when it's moved/copied
4452
struct CopyMoveCounter
4553
{

cpp/mrc/tests/test_node.cpp

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@
2121
#include "mrc/node/rx_node.hpp"
2222
#include "mrc/node/rx_sink.hpp"
2323
#include "mrc/node/rx_source.hpp"
24+
#include "mrc/options/engine_groups.hpp"
2425
#include "mrc/options/options.hpp"
2526
#include "mrc/options/placement.hpp"
2627
#include "mrc/options/topology.hpp"
2728
#include "mrc/pipeline/executor.hpp"
2829
#include "mrc/pipeline/pipeline.hpp"
2930
#include "mrc/runnable/context.hpp"
3031
#include "mrc/runnable/launch_options.hpp"
32+
#include "mrc/runnable/types.hpp" // for EngineType
3133
#include "mrc/segment/builder.hpp"
3234
#include "mrc/segment/object.hpp"
35+
#include "mrc/system/device_info.hpp"
3336
#include "mrc/utils/string_utils.hpp"
3437

3538
#include <boost/fiber/operations.hpp>
@@ -789,4 +792,112 @@ TEST_P(ParallelTests, NodeMultiThread)
789792
EXPECT_EQ(complete_count, 1);
790793
}
791794

795+
struct PeExceedsResourcesParams
796+
{
797+
std::string cpu_set;
798+
std::size_t pe_count;
799+
std::string bad_node;
800+
mrc::runnable::EngineType engine_type;
801+
};
802+
803+
struct PeExceedsTests : public testing::TestWithParam<PeExceedsResourcesParams>
804+
{};
805+
806+
// Run parallel tests for 1, 2 and 4 threads
807+
INSTANTIATE_TEST_SUITE_P(TestNode,
808+
PeExceedsTests,
809+
testing::Values(PeExceedsResourcesParams{"0-9", // cpu set of 10 cores
810+
11, // pe count one greater
811+
"sink",
812+
mrc::runnable::EngineType::Fiber},
813+
PeExceedsResourcesParams{"0-5", // cpu set of 6 cores
814+
11,
815+
"source",
816+
mrc::runnable::EngineType::Fiber},
817+
PeExceedsResourcesParams{"0-3", // cpu set of 4 cores
818+
5, // pe lower than cores
819+
"node",
820+
mrc::runnable::EngineType::Fiber},
821+
PeExceedsResourcesParams{"0-9", // cpu set of 10 cores
822+
11, // pe count one greater
823+
"sink",
824+
mrc::runnable::EngineType::Thread},
825+
PeExceedsResourcesParams{"0-5", // cpu set of 6 cores
826+
11,
827+
"source",
828+
mrc::runnable::EngineType::Thread},
829+
PeExceedsResourcesParams{"0-3", // cpu set of 4 cores
830+
5, // pe lower than cores
831+
"node",
832+
mrc::runnable::EngineType::Thread}));
833+
834+
TEST_P(PeExceedsTests, PeExceedsResources)
835+
{
836+
const auto test_params = GetParam();
837+
838+
EXPECT_DEATH_OR_THROW(
839+
{
840+
// EXPECT_DEATH Executes this test in a fork, we need to reset the NvmlState singleton which would
841+
// otherwise reflect the state of the parent process
842+
mrc::system::DeviceInfo::Reset();
843+
auto p = mrc::make_pipeline();
844+
845+
const std::string cpu_set = test_params.cpu_set;
846+
const std::size_t pe_count = test_params.pe_count;
847+
848+
auto my_segment = p->make_segment("my_segment", [&](segment::IBuilder& seg) {
849+
auto source = seg.make_source<int>("src1", [&](rxcpp::subscriber<int>& s) {
850+
EXPECT_TRUE(false) << "Preflight checks should fail, this should not be "
851+
"called";
852+
});
853+
854+
auto node = seg.make_node<int>("node", rxcpp::operators::map([&](const int& x) {
855+
EXPECT_TRUE(false) << "Preflight checks should fail, this should "
856+
"not be "
857+
"called";
858+
return x;
859+
}));
860+
861+
auto sink = seg.make_sink<int>(
862+
"sink",
863+
[&](const int& x) {
864+
EXPECT_TRUE(false) << "Preflight checks should fail, this should not be "
865+
"called";
866+
},
867+
[&]() {
868+
EXPECT_TRUE(false) << "Preflight checks should fail, this should not be "
869+
"called";
870+
});
871+
872+
if (test_params.bad_node == "source")
873+
{
874+
source->launch_options().pe_count = pe_count;
875+
}
876+
else if (test_params.bad_node == "node")
877+
{
878+
node->launch_options().pe_count = pe_count;
879+
}
880+
else
881+
{
882+
sink->launch_options().pe_count = pe_count;
883+
}
884+
885+
seg.make_edge(source, node);
886+
seg.make_edge(node, sink);
887+
});
888+
889+
auto options = std::make_unique<Options>();
890+
options->topology().user_cpuset(cpu_set);
891+
options->engine_factories().set_default_engine_type(test_params.engine_type);
892+
893+
Executor exec(std::move(options));
894+
exec.register_pipeline(std::move(p));
895+
896+
exec.start();
897+
exec.join();
898+
},
899+
"A node was destructed which still had dependent connections",
900+
std::runtime_error);
901+
}
902+
792903
} // namespace mrc

0 commit comments

Comments
 (0)