Skip to content

Refactored xcomm API #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 5 additions & 227 deletions include/xeus/xcomm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,9 @@ namespace xeus

using function_type = std::function<void(xcomm&&, xmessage)>;

xtarget();
xtarget(const std::string& name, const function_type& callback, xcomm_manager* manager);

std::string& name() & noexcept;
const std::string& name() const & noexcept;
std::string name() const && noexcept;
const std::string& name() const noexcept;

void operator()(xcomm&& comm, xmessage request) const;

Expand Down Expand Up @@ -77,12 +74,10 @@ namespace xeus

using handler_type = std::function<void(xmessage)>;

xcomm() = delete;
explicit xcomm(const xtarget* target, xguid id = xeus::new_xguid());
~xcomm();
xcomm(xcomm&&);
xcomm(const xcomm&);
xcomm(xtarget* target);
xcomm(xtarget* target, xguid id);

xcomm& operator=(xcomm&&);
xcomm& operator=(const xcomm&);
Expand All @@ -91,7 +86,6 @@ namespace xeus
void close(nl::json metadata, nl::json data, buffer_sequence buffers);
void send(nl::json metadata, nl::json data, buffer_sequence buffers) const;

xtarget& target() noexcept;
const xtarget& target() const noexcept;

void handle_message(xmessage request);
Expand Down Expand Up @@ -129,7 +123,7 @@ namespace xeus

handler_type m_close_handler;
handler_type m_message_handler;
xtarget* p_target;
const xtarget* p_target;
xguid m_id;
bool m_moved_from;
};
Expand Down Expand Up @@ -160,11 +154,9 @@ namespace xeus
void comm_close(xmessage request);
void comm_msg(xmessage request);

std::map<xguid, xcomm*>& comms() & noexcept;
const std::map<xguid, xcomm*>& comms() const & noexcept;
std::map<xguid, xcomm*> comms() const && noexcept;
const std::map<xguid, xcomm*>& comms() const noexcept;

xtarget* target(const std::string& target_name);
const xtarget* target(const std::string& target_name) const;

private:

Expand All @@ -180,200 +172,10 @@ namespace xeus
xkernel_core* p_kernel;
};

/**************************
* xtarget implementation *
**************************/

inline xtarget::xtarget()
: m_name()
, m_callback()
, p_manager(nullptr)
{
}

inline xtarget::xtarget(const std::string& name,
const function_type& callback,
xcomm_manager* manager)
: m_name(name)
, m_callback(callback)
, p_manager(manager)
{
}

inline std::string& xtarget::name() & noexcept
{
return m_name;
}

inline const std::string& xtarget::name() const & noexcept
{
return m_name;
}

inline std::string xtarget::name() const && noexcept
{
return m_name;
}

inline void xtarget::operator()(xcomm&& comm, xmessage message) const
{
return m_callback(std::move(comm), std::move(message));
}

inline void xtarget::register_comm(xguid id, xcomm* comm) const
{
p_manager->register_comm(id, comm);
}

inline void xtarget::unregister_comm(xguid id) const
{
p_manager->unregister_comm(id);
}

/************************
* xcomm implementation *
************************/

inline xtarget& xcomm::target() noexcept
{
return *p_target;
}

inline const xtarget& xcomm::target() const noexcept
{
return *p_target;
}

inline void xcomm::handle_close(xmessage message)
{
if (m_close_handler)
{
m_close_handler(std::move(message));
}
}

inline void xcomm::handle_message(xmessage message)
{
if (m_message_handler)
{
m_message_handler(std::move(message));
}
}

inline void xcomm::send_comm_message(const std::string& msg_type,
nl::json metadata,
nl::json data,
buffer_sequence buffers) const
{
nl::json content;
content["comm_id"] = m_id;
content["data"] = std::move(data);
target().publish_message(msg_type, std::move(metadata), std::move(content), std::move(buffers));
}

inline void xcomm::send_comm_message(const std::string& msg_type,
nl::json metadata,
nl::json data,
buffer_sequence buffers,
const std::string& target_name) const
{
nl::json content;
content["comm_id"] = m_id;
content["target_name"] = target_name;
content["data"] = std::move(data);
target().publish_message(msg_type, std::move(metadata), std::move(content), std::move(buffers));
}

inline xcomm::xcomm(xcomm&& comm)
: m_close_handler(std::move(comm.m_close_handler))
, m_message_handler(std::move(comm.m_message_handler))
, p_target(std::move(comm.p_target))
, m_id(std::move(comm.m_id))
, m_moved_from(false)
{
comm.m_moved_from = true;
p_target->register_comm(m_id,
this); // Replacing the address of the moved comm with `this`.
}

inline xcomm::xcomm(const xcomm& comm)
: p_target(comm.p_target)
, m_id(xeus::new_xguid())
, m_moved_from(false)
{
p_target->register_comm(m_id, this);
}

inline xcomm& xcomm::operator=(xcomm&& comm)
{
m_close_handler = std::move(comm.m_close_handler);
m_message_handler = std::move(comm.m_message_handler);
p_target = std::move(comm.p_target);
p_target->unregister_comm(m_id);
m_id = std::move(comm.m_id);
m_moved_from = false;
comm.m_moved_from = true;
p_target->register_comm(m_id,
this); // Replacing the address of the moved comm with `this`.
return *this;
}

inline xcomm& xcomm::operator=(const xcomm& comm)
{
p_target = comm.p_target;
p_target->unregister_comm(m_id);
m_id = new_xguid();
m_moved_from = false;
p_target->register_comm(m_id, this);
return *this;
}

inline xcomm::xcomm(xtarget* target)
: p_target(target)
, m_id(xeus::new_xguid())
{
p_target->register_comm(m_id, this);
}

inline xcomm::xcomm(xtarget* target, xguid id)
: p_target(target)
, m_id(id)
{
p_target->register_comm(m_id, this);
}

inline xcomm::~xcomm()
{
if (!m_moved_from)
{
p_target->unregister_comm(m_id);
}
}

inline void xcomm::open(nl::json metadata, nl::json data, buffer_sequence buffers)
{
send_comm_message("comm_open",
std::move(metadata),
std::move(data),
std::move(buffers),
p_target->name());
}

inline void xcomm::close(nl::json metadata, nl::json data, buffer_sequence buffers)
{
send_comm_message("comm_close", std::move(metadata), std::move(data), std::move(buffers));
}

inline void xcomm::send(nl::json metadata, nl::json data, buffer_sequence buffers) const
{
send_comm_message("comm_msg", std::move(metadata), std::move(data), std::move(buffers));
}

inline xguid xcomm::id() const noexcept
{
return m_id;
}

template <class T>
inline void xcomm::on_message(T&& handler)
{
Expand All @@ -385,30 +187,6 @@ namespace xeus
{
m_close_handler = std::forward<T>(handler);
}

/********************************
* xcomm_manager implementation *
********************************/

inline xtarget* xcomm_manager::target(const std::string& target_name)
{
return &m_targets[target_name];
}

inline std::map<xguid, xcomm*>& xcomm_manager::comms() & noexcept
{
return m_comms;
}

inline const std::map<xguid, xcomm*>& xcomm_manager::comms() const & noexcept
{
return m_comms;
}

inline std::map<xguid, xcomm*> xcomm_manager::comms() const && noexcept
{
return m_comms;
}
}

#endif
Loading
Loading