diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eab6b13 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +/.bundle/ +/.yardoc +/_yardoc/ +/coverage/ +/doc/ +/pkg/ +/spec/reports/ +/tmp/ + +# rspec failure tracking +.rspec_status + +# build artifacts +lib/pulsar/bindings.bundle +*.gem + +spec/pulsar/ext/bindings.bundle +spec/pulsar/ext/bindings.o +spec/pulsar/ext/Makefile + +# Intellij IDEA +.idea/ diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..34c5164 --- /dev/null +++ b/.rspec @@ -0,0 +1,3 @@ +--format documentation +--color +--require spec_helper diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..8e2673c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,27 @@ +dist: bionic # ubuntu 18.04 +sudo: false +services: + - docker +language: ruby +rvm: + - 2.4.9 # oldest pre-install on bionic +install: + - sudo apt-get -y install apt-utils automake + # - sudo apt-get upgrade libstdc++6 + - wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/DEB/apache-pulsar-client.deb + - wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/DEB/apache-pulsar-client-dev.deb + - sudo apt-get -y install ./apache-pulsar-client.deb + - sudo apt-get -y install ./apache-pulsar-client-dev.deb + - gem install bundler -v 1.16.1 + - bundle install +before_script: + - docker run --name pulsar -d -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:latest bin/pulsar standalone + - docker exec pulsar bash -c 'ready (){ (exec > /dev/tcp/$1/$2); }; i=0; while ! ready localhost 8080; do sleep 1; i=$((i+1)); [[ $i -gt 10 ]] && break; done' + - docker exec pulsar bin/pulsar-admin tenants create ruby-client + - docker exec pulsar bin/pulsar-admin namespaces create ruby-client/tests +env: + global: + PULSAR_BROKER_URI: pulsar://localhost:6650 + PULSAR_CLIENT_RUBY_TEST_NAMESPACE: ruby-client/tests +script: + - rake diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..8e2d1ab --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,26 @@ + + +#### Initial contributors + +* Jacob Fugal +* Matteo Merli +* JD Harrington diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..2ccaf5f --- /dev/null +++ b/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source "https://rubygems.org" + +git_source(:github) {|repo_name| "https://github.com/#{repo_name}" } + +# Specify your gem's dependencies in pulsar-client.gemspec +gemspec diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 0000000..62f3c0e --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,40 @@ +PATH + remote: . + specs: + pulsar-client (2.6.1.pre.beta.2) + rake-compiler (~> 1.0) + rice (~> 2.1) + +GEM + remote: https://rubygems.org/ + specs: + diff-lcs (1.4.4) + rake (10.5.0) + rake-compiler (1.1.1) + rake + rice (2.2.0) + rspec (3.10.0) + rspec-core (~> 3.10.0) + rspec-expectations (~> 3.10.0) + rspec-mocks (~> 3.10.0) + rspec-core (3.10.1) + rspec-support (~> 3.10.0) + rspec-expectations (3.10.1) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.10.0) + rspec-mocks (3.10.2) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.10.0) + rspec-support (3.10.2) + +PLATFORMS + ruby + +DEPENDENCIES + bundler (~> 1.16) + pulsar-client! + rake (~> 10.0) + rspec (~> 3.0) + +BUNDLED WITH + 1.17.3 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..96c2045 --- /dev/null +++ b/NOTICE @@ -0,0 +1,6 @@ + +Apache Pulsar Ruby Client +Copyright 2019 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). diff --git a/README.md b/README.md index ed9f38a..f0c1c71 100644 --- a/README.md +++ b/README.md @@ -1 +1,116 @@ # Apache Pulsar Ruby client + +## Installation + +Add this line to your application's Gemfile: + +```ruby +gem "pulsar-client", "~> 2.6.1.pre" +``` + +And then execute: + + $ bundle + +Or install it yourself as: + + $ gem install pulsar-client --pre + +Note #1: You will need libpulsar (for linking) and libpulsar-dev (for +C++ client header files, for compiling) installed first. For both, the +Gem currently targets version 2.6.1. If your libpulsar is older, it will +fail to compile. If it is newer, it _might_ compile is not guaranteed. + +Note #2: This is a pre-release version of this Gem. You will need the +`--pre` flag to `gem install` to install it manually, and must include +the `.pre` suffix in the Gemfile to install it via Bundler. + +## Usage + +Setup and basic `consumer.receive` example: + +```ruby +# use a standard Pulsar client config (see https://github.com/apache/pulsar/blob/master/conf/client.conf) +# export PULSAR_CLIENT_CONF=/path/to/your/client/conf/client_conf.conf +# OR, if not present have these in your shell with appropriate values +# export PULSAR_BROKER_URI=pulsar://your-pulsar-broker:6651 +# export PULSAR_CERT_PATH=/path/to/your/pulsar-ca.pem +# export PULSAR_AUTH_TOKEN=your-auth-token + +# create client using values from environment +client = Pulsar::Client.from_environment + +# produce a message on the "hello-world" topic in the "namespace" +# namespace of the "tenant" tenant +topic = "tenant/namespace/topic" +producer = client.create_producer(topic) +producer.send("Hello, world!") + +# consumer that message from the topic with an exclusive subscription +# named "hello-consumer" +subscription = "hello-consumer" +consumer = client.subscribe(topic, subscription) + +msg = consumer.receive +message = msg.data +puts "got #{message}" +consumer.acknolwedge(msg) +``` + +Convenience method for listening to messages in a loop: + +```ruby +consumer.listen do |message, _, done| + # process message here; call done to stop the loop. + # messages are auto-acknowledged. + puts "got #{message}" + done.call() +end +``` + +Convenience method for listening on a separate thread: + +```ruby +listenerThread = consumer.listen_in_thread do |message, _, done| + # process message here; call done to stop the loop. + # messages are auto-acknowledged. + puts "got #{message}" + done.call() +end +# ... +listenerThread.join # wait for the thread to finish +``` + +(more documentation coming; see TODO.md) + +## Development + +If your ruby is not already compiled with `--enable-shared`, you'll need +to rebuild it. Example for rbenv: + +``` +CONFIGURE_OPTS="--enable-shared" rbenv install +``` + +If you don't already have them installed, you need libpulsar and +automake for the compilation and linking to work. Example with brew: + +``` +brew install libpulsar automake +``` + +Next, run `bin/setup` to install dependencies -- Rice in particular. +Once that successfully completes, you can `rake compile` to build the +extension. It is then ready to use locally. + +You can run `bin/console` for an interactive prompt that will +allow you to experiment. You can also run `rake spec` to run the tests. + +To install this gem onto your local machine, run `bundle exec rake +install`. + +## Contributing + +Bug reports and pull requests are welcome on GitHub at +https://github.com/apache/pulsar-client-ruby or +https://github.com/instructure/pulsar-client-ruby. diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..4055a85 --- /dev/null +++ b/Rakefile @@ -0,0 +1,10 @@ +require "rspec/core/rake_task" +require "rake/extensiontask" + +RSpec::Core::RakeTask.new(:spec) + +task :default => [:compile, :spec] + +Rake::ExtensionTask.new "bindings" do |ext| + ext.lib_dir = "lib/pulsar" +end diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..2069c4f --- /dev/null +++ b/TODO.md @@ -0,0 +1,24 @@ +# Confirm Licensing and Copyright + +Currently, the repository LICENSE file states APL 2.0, reiterated in the +Ruby source files. The NOTICE file attributes Copyright to the ASF. Get +approval for this from Instructure OSC. + +Aside: The uncompiled C++ in the ext/bindings/ directory are also +distributed, to be compiled on the user's machine during gem install. +Should these then also have license information per file, as the Ruby +files do? + +# README Detail + +The README has very minimal information on installing and building +locally right now. It needs to be fleshed out more. In particular, usage +of the library, specifically around significant divergences from the C++ +code (e.g. `ClientConfiguration#authentication_token=` and +`Consumer#listen`) + +# Write Some Specs + +# Code TODOs + +* `producer.schema` diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..c60569e --- /dev/null +++ b/bin/console @@ -0,0 +1,8 @@ +#!/usr/bin/env ruby + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" +require "irb" + +IRB.start(__FILE__) diff --git a/bin/example-consumer b/bin/example-consumer new file mode 100755 index 0000000..70b288e --- /dev/null +++ b/bin/example-consumer @@ -0,0 +1,48 @@ +#!/usr/bin/env ruby + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" + +unless ARGV.size >= 1 && ARGV.size <= 2 && Pulsar::Client.sufficient_environment? + puts "Usage: #{__FILE__} [consumer-name]" + puts + puts "If not specified, the consumer name defaults to 'example-consumer'." + puts + puts "If set, PULSAR_CLIENT_CONF is loaded and used as a configuration." + puts "Broker URI, TLS settings and the authorization token to use is loaded" + puts "from this file." + puts "If this variable is not present, the PULSAR_BROKER_URI environment variable" + puts "must be set. In this case PULSAR_CERT_PATH and PULSAR_AUTH_TOKEN environment" + puts "variables are also recognized." + exit 1 +end + +topic = ARGV[0] +consumer_name = ARGV[1] || 'example-consumer' +client = Pulsar::Client.from_environment +consumer = client.subscribe(topic, consumer_name) +consumer.listen do |message, id, finish| + puts "Received message '#{message}' [id: #{id}]" + finish.call if message == "exit" +end +client.close diff --git a/bin/example-producer b/bin/example-producer new file mode 100755 index 0000000..6257e96 --- /dev/null +++ b/bin/example-producer @@ -0,0 +1,46 @@ +#!/usr/bin/env ruby + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), "../lib")) + +require "pulsar/client" + +unless ARGV.size == 1 && Pulsar::Client.sufficient_environment? + puts "Usage: #{__FILE__} " + puts + puts "If set, PULSAR_CLIENT_CONF is loaded and used as a configuration." + puts "Broker URI, TLS settings and the authorization token to use is loaded" + puts "from this file." + puts "If this variable is not present, the PULSAR_BROKER_URI environment variable" + puts "must be set. In this case PULSAR_CERT_PATH and PULSAR_AUTH_TOKEN environment" + puts "variables are also recognized." + exit 1 +end + +topic = ARGV.shift +client = Pulsar::Client.from_environment +producer = client.create_producer(topic) +while data = gets + data.chomp! + producer.send(data) + break if data == "exit" +end +client.close diff --git a/bin/setup b/bin/setup new file mode 100755 index 0000000..cf4ad25 --- /dev/null +++ b/bin/setup @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -euo pipefail +IFS=$'\n\t' +set -vx + +bundle install diff --git a/ext/bindings/bindings.cpp b/ext/bindings/bindings.cpp new file mode 100644 index 0000000..3a8bc4a --- /dev/null +++ b/ext/bindings/bindings.cpp @@ -0,0 +1,20 @@ +#include "rice/Module.hpp" + +#include "message.hpp" +#include "producer.hpp" +#include "consumer.hpp" +#include "client.hpp" +#include "util.hpp" + +using namespace Rice; + +extern "C" +void Init_bindings() +{ + Module rb_mPulsar = define_module("Pulsar"); + bind_errors(rb_mPulsar); + bind_message(rb_mPulsar); + bind_producer(rb_mPulsar); + bind_consumer(rb_mPulsar); + bind_client(rb_mPulsar); +} diff --git a/ext/bindings/client.cpp b/ext/bindings/client.cpp new file mode 100644 index 0000000..372b1df --- /dev/null +++ b/ext/bindings/client.cpp @@ -0,0 +1,225 @@ +#include "rice/Data_Type.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "client.hpp" +#include "logger.hpp" +#include "util.hpp" +#include "vector.hpp" + +namespace pulsar_rb { + +ClientConfiguration::ClientConfiguration() : _config() { +} + +void ClientConfiguration::setAuthFromToken(const std::string &token) { + _config.setAuth(pulsar::AuthToken::createWithToken(token)); +} + +int ClientConfiguration::getOperationTimeoutSeconds() { + return _config.getOperationTimeoutSeconds(); +} + +void ClientConfiguration::setOperationTimeoutSeconds(int timeout) { + _config.setOperationTimeoutSeconds(timeout); +} + +int ClientConfiguration::getIOThreads() { + return _config.getIOThreads(); +} + +void ClientConfiguration::setIOThreads(int threads) { + _config.setIOThreads(threads); +} + +int ClientConfiguration::getMessageListenerThreads() { + return _config.getMessageListenerThreads(); +} + +void ClientConfiguration::setMessageListenerThreads(int threads) { + _config.setMessageListenerThreads(threads); +} + +int ClientConfiguration::getConcurrentLookupRequest() { + return _config.getConcurrentLookupRequest(); +} + +void ClientConfiguration::setConcurrentLookupRequest(int n) { + _config.setConcurrentLookupRequest(n); +} + +std::string ClientConfiguration::getLogConfFilePath() { + return _config.getLogConfFilePath(); +} + +void ClientConfiguration::setLogConfFilePath(const std::string& path) { + _config.setLogConfFilePath(path); +} + +void ClientConfiguration::setSilentLogging(bool enable) { + // The logger can only be set once, so if it's already on we cannot disable it. + if (silentLogging) { + if (!enable) { + throw Rice::Exception(rb_eArgError, "silent_logging can only be set once"); + } + } + + if (enable) { + silentLogging = true; + std::unique_ptr loggerFactory = SilentLoggerFactory::create(); + _config.setLogger(loggerFactory.release()); + } +} + +bool ClientConfiguration::getSilentLogging() { + return silentLogging; +} + +bool ClientConfiguration::isUseTls() { + return _config.isUseTls(); +} + +void ClientConfiguration::setUseTls(bool enable) { + _config.setUseTls(enable); +} + +std::string ClientConfiguration::getTlsTrustCertsFilePath() { + return _config.getTlsTrustCertsFilePath(); +} + +void ClientConfiguration::setTlsTrustCertsFilePath(const std::string& path) { + _config.setTlsTrustCertsFilePath(path); +} + +bool ClientConfiguration::isTlsAllowInsecureConnection() { + return _config.isTlsAllowInsecureConnection(); +} + +void ClientConfiguration::setTlsAllowInsecureConnection(bool enable) { + _config.setTlsAllowInsecureConnection(enable); +} + +bool ClientConfiguration::isValidateHostName() { + return _config.isValidateHostName(); +} + +void ClientConfiguration::setValidateHostName(bool enable) { + _config.setValidateHostName(enable); +} + +Client::Client(Rice::String service_url, const ClientConfiguration& config) : _client(service_url.str(), config._config) { +} + +typedef struct { + pulsar::Client& client; + const Rice::String& topic; + const pulsar::ProducerConfiguration& config; + pulsar::Producer producer; + pulsar::Result result; +} client_create_producer_task; + +void* client_create_producer_worker(void* taskPtr) { + client_create_producer_task& task = *(client_create_producer_task*)taskPtr; + task.result = task.client.createProducer(task.topic.str(), task.config, task.producer); + return nullptr; +} + +Producer::ptr Client::create_producer(Rice::String topic, const ProducerConfiguration& config) { + client_create_producer_task task = { _client, topic, config }; + rb_thread_call_without_gvl(&client_create_producer_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return Producer::ptr(new Producer(task.producer)); +} + +typedef struct { + pulsar::Client& client; + const Rice::Array& topics; + const Rice::String& subscriptionName; + const pulsar::ConsumerConfiguration& config; + pulsar::Consumer consumer; + pulsar::Result result; +} client_subscribe_task; + +void* client_subscribe_worker(void* taskPtr) { + client_subscribe_task& task = *(client_subscribe_task*)taskPtr; + + const std::vector& topics = from_ruby>(task.topics); + switch(topics.size()) { + case 0: + throw Rice::Exception(rb_eArgError, "Must have at least one topic"); + break; + case 1: { + // Skip the MultiTopicsConsumer if there's only one. + task.result = task.client.subscribe(topics[0], task.subscriptionName.str(), task.config, task.consumer); + break; + } + default: { + task.result = task.client.subscribe(topics, task.subscriptionName.str(), task.config, task.consumer); + break; + } + } + return nullptr; +} + +Consumer::ptr Client::subscribe(Rice::Array topics, Rice::String subscriptionName, const ConsumerConfiguration& config) { + client_subscribe_task task = { _client, topics, subscriptionName, config }; + rb_thread_call_without_gvl(&client_subscribe_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); + return Consumer::ptr(new Consumer(task.consumer)); +} + +typedef struct { + pulsar::Client& client; + pulsar::Result result; +} client_close_task; + +void* client_close_worker(void* taskPtr) { + client_close_task& task = *(client_close_task*)taskPtr; + task.result = task.client.close(); + return nullptr; +} + +void Client::close() { + client_close_task task = { _client }; + rb_thread_call_without_gvl(&client_close_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +} + +using namespace Rice; + +void bind_client(Module& module) { + define_class_under(module, "Client") + .define_constructor(Constructor()) + .define_method("create_producer", &pulsar_rb::Client::create_producer) + .define_method("subscribe", &pulsar_rb::Client::subscribe) + .define_method("close", &pulsar_rb::Client::close) + ; + + define_class_under(module, "ClientConfiguration") + .define_constructor(Constructor()) + .define_method("authentication_token=", &pulsar_rb::ClientConfiguration::setAuthFromToken) + .define_method("operation_timeout_seconds", &pulsar_rb::ClientConfiguration::getOperationTimeoutSeconds) + .define_method("operation_timeout_seconds=", &pulsar_rb::ClientConfiguration::setOperationTimeoutSeconds) + .define_method("io_threads", &pulsar_rb::ClientConfiguration::getIOThreads) + .define_method("io_threads=", &pulsar_rb::ClientConfiguration::setIOThreads) + .define_method("message_listener_threads", &pulsar_rb::ClientConfiguration::getMessageListenerThreads) + .define_method("message_listener_threads=", &pulsar_rb::ClientConfiguration::setMessageListenerThreads) + .define_method("concurrent_lookup_requests", &pulsar_rb::ClientConfiguration::getConcurrentLookupRequest) + .define_method("concurrent_lookup_requests=", &pulsar_rb::ClientConfiguration::setConcurrentLookupRequest) + .define_method("log_conf_file_path", &pulsar_rb::ClientConfiguration::getLogConfFilePath) + .define_method("log_conf_file_path=", &pulsar_rb::ClientConfiguration::setLogConfFilePath) + .define_method("silent_logging?", &pulsar_rb::ClientConfiguration::getSilentLogging) + .define_method("silent_logging=", &pulsar_rb::ClientConfiguration::setSilentLogging) + .define_method("use_tls?", &pulsar_rb::ClientConfiguration::isUseTls) + .define_method("use_tls=", &pulsar_rb::ClientConfiguration::setUseTls) + .define_method("tls_trust_certs_file_path", &pulsar_rb::ClientConfiguration::getTlsTrustCertsFilePath) + .define_method("tls_trust_certs_file_path=", &pulsar_rb::ClientConfiguration::setTlsTrustCertsFilePath) + .define_method("tls_allow_insecure_connection?", &pulsar_rb::ClientConfiguration::isTlsAllowInsecureConnection) + .define_method("tls_allow_insecure_connection=", &pulsar_rb::ClientConfiguration::setTlsAllowInsecureConnection) + .define_method("tls_validate_hostname?", &pulsar_rb::ClientConfiguration::isValidateHostName) + .define_method("tls_validate_hostname=", &pulsar_rb::ClientConfiguration::setValidateHostName) + ; +} diff --git a/ext/bindings/client.hpp b/ext/bindings/client.hpp new file mode 100644 index 0000000..8b9a9c3 --- /dev/null +++ b/ext/bindings/client.hpp @@ -0,0 +1,59 @@ +#ifndef __PULSAR_RUBY_CLIENT_CLIENT_HPP +#define __PULSAR_RUBY_CLIENT_CLIENT_HPP + +#include "rice/Module.hpp" +#include "rice/String.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "producer.hpp" +#include "consumer.hpp" + +namespace pulsar_rb { + class ClientConfiguration { + public: + pulsar::ClientConfiguration _config; + bool silentLogging = false; + ClientConfiguration(); + + void setAuthFromToken(const std::string &token); + int getOperationTimeoutSeconds(); + void setOperationTimeoutSeconds(int timeout); + int getIOThreads(); + void setIOThreads(int threads); + int getMessageListenerThreads(); + void setMessageListenerThreads(int threads); + int getConcurrentLookupRequest(); + void setConcurrentLookupRequest(int n); + std::string getLogConfFilePath(); + void setLogConfFilePath(const std::string& path); + void setSilentLogging(bool); + bool getSilentLogging(); + bool isUseTls(); + void setUseTls(bool enable); + std::string getTlsTrustCertsFilePath(); + void setTlsTrustCertsFilePath(const std::string& path); + bool isTlsAllowInsecureConnection(); + void setTlsAllowInsecureConnection(bool enable); + bool isValidateHostName(); + void setValidateHostName(bool enable); + + typedef Rice::Data_Object ptr; + }; + + class Client { + public: + pulsar::Client _client; + Client(Rice::String service_url, const ClientConfiguration& config); + + Producer::ptr create_producer(Rice::String topic, const ProducerConfiguration& config); + Consumer::ptr subscribe(Rice::Array topics, Rice::String subscriptionName, const ConsumerConfiguration& config); + void close(); + + typedef Rice::Data_Object ptr; + }; +}; + +void bind_client(Rice::Module &module); + +#endif diff --git a/ext/bindings/consumer.cpp b/ext/bindings/consumer.cpp new file mode 100644 index 0000000..ca8fd42 --- /dev/null +++ b/ext/bindings/consumer.cpp @@ -0,0 +1,117 @@ +#include "rice/Data_Type.hpp" +#include "rice/Enum.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "consumer.hpp" +#include "util.hpp" + +namespace pulsar_rb { + +typedef struct { + pulsar::Consumer& consumer; + unsigned int timeout_ms; + pulsar::Message message; + pulsar::Result result; +} consumer_receive_job; + +typedef struct { + pulsar::Consumer& consumer; + pulsar::Result result; +} consumer_close_task; + +void* consumer_receive_nogvl(void* jobPtr) { + consumer_receive_job& job = *(consumer_receive_job*)jobPtr; + if (job.timeout_ms > 0) { + job.result = job.consumer.receive(job.message, job.timeout_ms); + } else { + job.result = job.consumer.receive(job.message); + } + return nullptr; +} + +pulsar::Message consumer_receive(pulsar::Consumer& consumer, unsigned int timeout_ms) { + consumer_receive_job job = { consumer, timeout_ms }; + rb_thread_call_without_gvl(&consumer_receive_nogvl, &job, RUBY_UBF_IO, nullptr); + CheckResult(job.result); + return job.message; +} + +Message::ptr Consumer::receive(unsigned int timeout_ms) { + pulsar::Message message = consumer_receive(_consumer, timeout_ms); + return Message::ptr(new Message(message)); +} + +void Consumer::acknowledge(const Message& message) { + _consumer.acknowledgeAsync(message._msg, nullptr); +} + +void Consumer::negative_acknowledge(const Message& message) { + _consumer.negativeAcknowledge(message._msg); +} + +void* consumer_close_worker(void* taskPtr) { + consumer_close_task& task = *(consumer_close_task*)taskPtr; + task.result = task.consumer.close(); + return nullptr; +} + +void Consumer::close() { + consumer_close_task task = { _consumer }; + rb_thread_call_without_gvl(&consumer_close_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +} + +using namespace Rice; + +void bind_consumer(Module &module) { + define_class_under(module, "Consumer") + .define_constructor(Constructor()) + .define_method("receive", &pulsar_rb::Consumer::receive, (Arg("timeout_ms") = 0)) + .define_method("acknowledge", &pulsar_rb::Consumer::acknowledge) + .define_method("negative_acknowledge", &pulsar_rb::Consumer::negative_acknowledge) + .define_method("close", &pulsar_rb::Consumer::close) + ; + + define_enum("ConsumerType", module) + .define_value("Exclusive", ConsumerExclusive) + .define_value("Shared", ConsumerShared) + .define_value("Failover", ConsumerFailover) + .define_value("KeyShared", ConsumerKeyShared); + + define_enum("InitialPosition", module) + .define_value("Latest", InitialPositionLatest) + .define_value("Earliest", InitialPositionEarliest); + + define_class_under(module, "ConsumerConfiguration") + .define_constructor(Constructor()) + .define_method("consumer_type", &ConsumerConfiguration::getConsumerType) + .define_method("consumer_type=", &ConsumerConfiguration::setConsumerType) + // TODO .define_method("schema", &ConsumerConfiguration::getSchema) + // TODO .define_method("schema=", &ConsumerConfiguration::setSchema) + // TODO .define_method("message_listener", &ConsumerConfiguration_setMessageListener) + .define_method("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) + .define_method("receiver_queue_size=", &ConsumerConfiguration::setReceiverQueueSize) + .define_method("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions) + .define_method("max_total_receiver_queue_size_across_partitions=", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) + .define_method("consumer_name", &ConsumerConfiguration::getConsumerName) + .define_method("consumer_name=", &ConsumerConfiguration::setConsumerName) + .define_method("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) + .define_method("unacked_messages_timeout_ms=", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) + .define_method("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs) + .define_method("negative_ack_redelivery_delay_ms=", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs) + .define_method("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs) + .define_method("broker_consumer_stats_cache_time_ms=", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) + .define_method("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod) + .define_method("pattern_auto_discovery_period=", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod) + .define_method("read_compacted?", &ConsumerConfiguration::isReadCompacted) + .define_method("read_compacted=", &ConsumerConfiguration::setReadCompacted) + .define_method("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) + .define_method("subscription_initial_position=", &ConsumerConfiguration::setSubscriptionInitialPosition) + .define_method("[]", &ConsumerConfiguration::getProperty) + .define_method("[]=", &ConsumerConfiguration::setProperty) + ; +} diff --git a/ext/bindings/consumer.hpp b/ext/bindings/consumer.hpp new file mode 100644 index 0000000..14ad46d --- /dev/null +++ b/ext/bindings/consumer.hpp @@ -0,0 +1,32 @@ +#ifndef __PULSAR_RUBY_CLIENT_CONSUMER_HPP +#define __PULSAR_RUBY_CLIENT_CONSUMER_HPP + +#include "rice/Module.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "message.hpp" + +namespace pulsar_rb { + class Consumer { + public: + pulsar::Consumer _consumer; + Consumer() {}; + Consumer(const pulsar::Consumer& consumer) : _consumer(consumer) {} + + Message::ptr receive(unsigned int timeout_ms=0); + void acknowledge(const Message& message); + void negative_acknowledge(const Message& message); + void close(); + + typedef Rice::Data_Object ptr; + }; + + // direct typedef instead of wrapping because implementations don't need any + // wrapping. but still re-namespaced for consistency + typedef pulsar::ConsumerConfiguration ConsumerConfiguration; +}; + +void bind_consumer(Rice::Module& module); + +#endif diff --git a/ext/bindings/extconf.rb b/ext/bindings/extconf.rb new file mode 100644 index 0000000..6533b54 --- /dev/null +++ b/ext/bindings/extconf.rb @@ -0,0 +1,4 @@ +require 'mkmf-rice' +$LOCAL_LIBS << "-lpulsar" +$CXXFLAGS += " -std=c++11 " +create_makefile('pulsar/bindings') diff --git a/ext/bindings/logger.cpp b/ext/bindings/logger.cpp new file mode 100644 index 0000000..8cfb8a2 --- /dev/null +++ b/ext/bindings/logger.cpp @@ -0,0 +1,23 @@ +#include + +#include "logger.hpp" + +namespace pulsar_rb { + +class SilentLogger : public pulsar::Logger { + std::string _logger; + + public: + SilentLogger(const std::string &logger) : _logger(logger) {} + + bool isEnabled(Level level) { return false; } + void log(Level level, int line, const std::string &message) { } +}; + +pulsar::Logger *SilentLoggerFactory::getLogger(const std::string& file) { return new SilentLogger(file); } + +std::unique_ptr SilentLoggerFactory::create() { + return std::unique_ptr(new SilentLoggerFactory()); +} + +} diff --git a/ext/bindings/logger.hpp b/ext/bindings/logger.hpp new file mode 100644 index 0000000..604c947 --- /dev/null +++ b/ext/bindings/logger.hpp @@ -0,0 +1,17 @@ +#ifndef __PULSAR_RUBY_CLIENT_LOGGER_HPP +#define __PULSAR_RUBY_CLIENT_LOGGER_HPP + +#include + +namespace pulsar_rb { + +class SilentLoggerFactory : public pulsar::LoggerFactory { + public: + pulsar::Logger* getLogger(const std::string& fileName); + + static std::unique_ptr create(); +}; + +} + +#endif diff --git a/ext/bindings/message.cpp b/ext/bindings/message.cpp new file mode 100644 index 0000000..e99033b --- /dev/null +++ b/ext/bindings/message.cpp @@ -0,0 +1,96 @@ +#include "rice/Data_Type.hpp" +#include "rice/Constructor.hpp" +#include "rice/Exception.hpp" +#include + +#include "message.hpp" +#include "stringmap.hpp" + +namespace pulsar_rb { + +Rice::String MessageId::toString() { + std::stringstream ss; + ss << _msgId; + return Rice::String(ss.str()); +} + +Message::Message(const std::string& data, Rice::Object arg = Rice::Object()) { + pulsar::MessageBuilder mb; + mb.setContent(data); + + if (arg && arg.rb_type() != T_NONE) { + Rice::Hash opts = Rice::Hash(arg); + Rice::Hash::iterator it = opts.begin(); + Rice::Hash::iterator end = opts.end(); + std::string key; + for (; it != end; ++it) { + key = it->key.to_s().str(); + if (key == "properties"){ + Rice::Object value = Rice::Object(it->value); + if (value.rb_type() != T_NIL) { + mb.setProperties(from_ruby(value)); + } + } else if (key == "partition_key") { + mb.setPartitionKey(Rice::Object(it->value).to_s().str()); + } else if (key == "ordering_key") { + mb.setOrderingKey(Rice::Object(it->value).to_s().str()); + } else { + throw Rice::Exception(rb_eArgError, "Unknown keyword argument: %s", key.c_str()); + } + } + } + + _msg = mb.build(); + received = false; +} + +Rice::String Message::getData() { + std::string str((const char*)_msg.getData(), _msg.getLength()); + return Rice::String(str); +} + +MessageId::ptr Message::getMessageId() { + pulsar::MessageId messageId = _msg.getMessageId(); + return MessageId::ptr(new MessageId(messageId)); +} + +Rice::Hash Message::getProperties() { + return to_ruby(_msg.getProperties()); +} + +Rice::String Message::getPartitionKey() { + return to_ruby(_msg.getPartitionKey()); +} + +Rice::String Message::getOrderingKey() { + return to_ruby(_msg.getOrderingKey()); +} + +Rice::Object Message::getTopicName() { + // If the message topic hasn't been set (it gets set when received, not when + // built) getTopicName will try to dereference a null pointer. + return received ? to_ruby(_msg.getTopicName()) : Rice::Nil; +} + +} + +using namespace Rice; + +void bind_message(Module& module) { + define_class_under(module, "MessageId") + .define_constructor(Constructor()) + .define_method("to_s", &pulsar_rb::MessageId::toString) + ; + + define_class_under(module, "Message") + .define_constructor(Constructor()) + .define_constructor(Constructor(), + (Rice::Arg("data"), Rice::Arg("options") = Rice::Object())) + .define_method("data", &pulsar_rb::Message::getData) + .define_method("message_id", &pulsar_rb::Message::getMessageId) + .define_method("properties", &pulsar_rb::Message::getProperties) + .define_method("partition_key", &pulsar_rb::Message::getPartitionKey) + .define_method("ordering_key", &pulsar_rb::Message::getOrderingKey) + .define_method("topic", &pulsar_rb::Message::getTopicName) + ; +} diff --git a/ext/bindings/message.hpp b/ext/bindings/message.hpp new file mode 100644 index 0000000..2422aa0 --- /dev/null +++ b/ext/bindings/message.hpp @@ -0,0 +1,41 @@ +#ifndef __PULSAR_RUBY_CLIENT_MESSAGE_HPP +#define __PULSAR_RUBY_CLIENT_MESSAGE_HPP + +#include "rice/Hash.hpp" +#include "rice/Module.hpp" +#include "rice/String.hpp" +#include "rice/Data_Object.hpp" +#include + +namespace pulsar_rb { + class MessageId { + public: + pulsar::MessageId _msgId; + MessageId(const pulsar::MessageId& msgId) : _msgId(msgId) {}; + + Rice::String toString(); + + typedef Rice::Data_Object ptr; + }; + + class Message { + public: + pulsar::Message _msg; + bool received = true; // received from consumer rather than built with builder + Message(const pulsar::Message& msg) : _msg(msg) {}; + Message(const std::string& data, Rice::Object opts); + + Rice::String getData(); + MessageId::ptr getMessageId(); + Rice::Hash getProperties(); + Rice::String getPartitionKey(); + Rice::String getOrderingKey(); + Rice::Object getTopicName(); + + typedef Rice::Data_Object ptr; + }; +}; + +void bind_message(Rice::Module& module); + +#endif diff --git a/ext/bindings/producer.cpp b/ext/bindings/producer.cpp new file mode 100644 index 0000000..7c2aec4 --- /dev/null +++ b/ext/bindings/producer.cpp @@ -0,0 +1,87 @@ +#include "rice/Data_Type.hpp" +#include "rice/Constructor.hpp" +#include +#include + +#include "producer.hpp" +#include "util.hpp" + +namespace pulsar_rb { + +typedef struct { + pulsar::Producer& producer; + const pulsar::Message& message; + pulsar::Result result; +} producer_send_task; + +typedef struct { + pulsar::Producer& producer; + pulsar::Result result; +} producer_close_task; + +void* producer_send_worker(void* taskPtr) { + producer_send_task& task = *(producer_send_task*)taskPtr; + task.result = task.producer.send(task.message); + return nullptr; +} + +void Producer::send(const Message& message) { + producer_send_task task = { _producer, message._msg }; + rb_thread_call_without_gvl(&producer_send_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +void* producer_close_worker(void* taskPtr) { + producer_close_task& task = *(producer_close_task*)taskPtr; + task.result = task.producer.close(); + return nullptr; +} + +void Producer::close() { + producer_close_task task = { _producer }; + rb_thread_call_without_gvl(&producer_close_worker, &task, RUBY_UBF_IO, nullptr); + CheckResult(task.result); +} + +} + +using namespace Rice; + +void bind_producer(Module& module) { + define_class_under(module, "Producer") + .define_constructor(Constructor()) + .define_method("send", &pulsar_rb::Producer::send) + .define_method("close", &pulsar_rb::Producer::close) + ; + + define_class_under(module, "ProducerConfiguration") + .define_constructor(Constructor()) + .define_method("producer_name", &ProducerConfiguration::getProducerName) + .define_method("producer_name=", &ProducerConfiguration::setProducerName) + // TODO .define_method("schema", &ProducerConfiguration::getSchema) + // TODO .define_method("schema=", &ProducerConfiguration::setSchema) + .define_method("send_timeout_millis", &ProducerConfiguration::getSendTimeout) + .define_method("send_timeout_millis=", &ProducerConfiguration::setSendTimeout) + .define_method("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId) + .define_method("initial_sequence_id=", &ProducerConfiguration::setInitialSequenceId) + .define_method("compression_type", &ProducerConfiguration::getCompressionType) + .define_method("compression_type=", &ProducerConfiguration::setCompressionType) + .define_method("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages) + .define_method("max_pending_messages=", &ProducerConfiguration::setMaxPendingMessages) + .define_method("max_pending_messages_across_partitions", &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions) + .define_method("max_pending_messages_across_partitions=", &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions) + .define_method("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull) + .define_method("block_if_queue_full=", &ProducerConfiguration::setBlockIfQueueFull) + .define_method("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) + .define_method("partitions_routing_mode=", &ProducerConfiguration::setPartitionsRoutingMode) + .define_method("batching_enabled", &ProducerConfiguration::getBatchingEnabled) + .define_method("batching_enabled=", &ProducerConfiguration::setBatchingEnabled) + .define_method("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages) + .define_method("batching_max_messages=", &ProducerConfiguration::setBatchingMaxMessages) + .define_method("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes) + .define_method("batching_max_allowed_size_in_bytes=", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes) + .define_method("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs) + .define_method("batching_max_publish_delay_ms=", &ProducerConfiguration::setBatchingMaxPublishDelayMs) + .define_method("[]", &ProducerConfiguration::getProperty) + .define_method("[]=", &ProducerConfiguration::setProperty); +} diff --git a/ext/bindings/producer.hpp b/ext/bindings/producer.hpp new file mode 100644 index 0000000..d8272b7 --- /dev/null +++ b/ext/bindings/producer.hpp @@ -0,0 +1,30 @@ +#ifndef __PULSAR_RUBY_CLIENT_PRODUCER_HPP +#define __PULSAR_RUBY_CLIENT_PRODUCER_HPP + +#include "rice/Module.hpp" +#include "rice/Data_Object.hpp" +#include + +#include "message.hpp" + +namespace pulsar_rb { + class Producer { + public: + pulsar::Producer _producer; + Producer() {}; + Producer(const pulsar::Producer& producer) : _producer(producer) {} + + void send(const Message& message); + void close(); + + typedef Rice::Data_Object ptr; + }; + + // direct typedef instead of wrapping because implementations don't need any + // wrapping. but still re-namespaced for consistency + typedef pulsar::ProducerConfiguration ProducerConfiguration; +}; + +void bind_producer(Rice::Module& module); + +#endif diff --git a/ext/bindings/stringmap.cpp b/ext/bindings/stringmap.cpp new file mode 100644 index 0000000..69bcfc0 --- /dev/null +++ b/ext/bindings/stringmap.cpp @@ -0,0 +1,33 @@ +#include "rice/Hash.hpp" + +#include "stringmap.hpp" + +template<> +pulsar::StringMap from_ruby(Rice::Object o) +{ + Rice::Hash h(o); + pulsar::StringMap m; + Rice::Hash::iterator it = h.begin(); + Rice::Hash::iterator end = h.end(); + for(; it != end; ++it) { + m[it->key.to_s().str()] = Rice::Object(it->value).to_s().str(); + } + return m; +} + +template<> +Rice::Object to_ruby(pulsar::StringMap const & m) +{ + Rice::Hash h = Rice::Hash(); + + pulsar::StringMap::const_iterator it = m.begin(); + pulsar::StringMap::const_iterator end = m.end(); + for(; it != end; ++it) { + h[Rice::String(it->first)] = Rice::String(it->second); + } + + // Be clear that calling hash[x]= will have no effect. + h.freeze(); + + return h; +} diff --git a/ext/bindings/stringmap.hpp b/ext/bindings/stringmap.hpp new file mode 100644 index 0000000..fb4ed7c --- /dev/null +++ b/ext/bindings/stringmap.hpp @@ -0,0 +1,13 @@ +#ifndef __PULSAR_RUBY_CLIENT_STRINGMAP_HPP +#define __PULSAR_RUBY_CLIENT_STRINGMAP_HPP + +#include "rice/Object.hpp" +#include + +template<> +pulsar::StringMap from_ruby(Rice::Object o); + +template<> +Rice::Object to_ruby(pulsar::StringMap const & m); + +#endif diff --git a/ext/bindings/util.cpp b/ext/bindings/util.cpp new file mode 100644 index 0000000..578a797 --- /dev/null +++ b/ext/bindings/util.cpp @@ -0,0 +1,183 @@ +#include + +#include "util.hpp" +#include "rice/Exception.hpp" + +using namespace Rice; + +// Result enum found in pulsar-*/pulsar-client-cpp/include/pulsar/Result.h + +VALUE rb_ePulsarError = Qnil; +VALUE rb_ePulsarError_UnknownError = Qnil; +VALUE rb_ePulsarError_InvalidConfiguration = Qnil; +VALUE rb_ePulsarError_Timeout = Qnil; +VALUE rb_ePulsarError_LookupError = Qnil; +VALUE rb_ePulsarError_ConnectError = Qnil; +VALUE rb_ePulsarError_ReadError = Qnil; +VALUE rb_ePulsarError_AuthenticationError = Qnil; +VALUE rb_ePulsarError_AuthorizationError = Qnil; +VALUE rb_ePulsarError_ErrorGettingAuthenticationData = Qnil; +VALUE rb_ePulsarError_BrokerMetadataError = Qnil; +VALUE rb_ePulsarError_BrokerPersistenceError = Qnil; +VALUE rb_ePulsarError_ChecksumError = Qnil; +VALUE rb_ePulsarError_ConsumerBusy = Qnil; +VALUE rb_ePulsarError_NotConnected = Qnil; +VALUE rb_ePulsarError_AlreadyClosed = Qnil; +VALUE rb_ePulsarError_InvalidMessage = Qnil; +VALUE rb_ePulsarError_ConsumerNotInitialized = Qnil; +VALUE rb_ePulsarError_ProducerNotInitialized = Qnil; +VALUE rb_ePulsarError_ProducerBusy = Qnil; +VALUE rb_ePulsarError_TooManyLookupRequestException = Qnil; +VALUE rb_ePulsarError_InvalidTopicName = Qnil; +VALUE rb_ePulsarError_InvalidUrl = Qnil; +VALUE rb_ePulsarError_ServiceUnitNotReady = Qnil; +VALUE rb_ePulsarError_OperationNotSupported = Qnil; +VALUE rb_ePulsarError_ProducerBlockedQuotaExceededError = Qnil; +VALUE rb_ePulsarError_ProducerBlockedQuotaExceededException = Qnil; +VALUE rb_ePulsarError_ProducerQueueIsFull = Qnil; +VALUE rb_ePulsarError_MessageTooBig = Qnil; +VALUE rb_ePulsarError_TopicNotFound = Qnil; +VALUE rb_ePulsarError_SubscriptionNotFound = Qnil; +VALUE rb_ePulsarError_ConsumerNotFound = Qnil; +VALUE rb_ePulsarError_UnsupportedVersionError = Qnil; +VALUE rb_ePulsarError_TopicTerminated = Qnil; +VALUE rb_ePulsarError_CryptoError = Qnil; +VALUE rb_ePulsarError_IncompatibleSchema = Qnil; +VALUE rb_ePulsarError_ConsumerAssignError = Qnil; +VALUE rb_ePulsarError_CumulativeAcknowledgementNotAllowedError = Qnil; +VALUE rb_ePulsarError_TransactionCoordinatorNotFoundError = Qnil; +VALUE rb_ePulsarError_InvalidTxnStatusError = Qnil; +VALUE rb_ePulsarError_NotAllowedError = Qnil; + +void bind_errors(Module &module) { + rb_ePulsarError = rb_define_class_under(module.value(), "Error", rb_eStandardError); + rb_ePulsarError_UnknownError = rb_define_class_under(rb_ePulsarError, "UnknownError", rb_ePulsarError); + rb_ePulsarError_InvalidConfiguration = rb_define_class_under(rb_ePulsarError, "InvalidConfiguration", rb_ePulsarError); + rb_ePulsarError_Timeout = rb_define_class_under(rb_ePulsarError, "Timeout", rb_ePulsarError); + rb_ePulsarError_LookupError = rb_define_class_under(rb_ePulsarError, "LookupError", rb_ePulsarError); + rb_ePulsarError_ConnectError = rb_define_class_under(rb_ePulsarError, "ConnectError", rb_ePulsarError); + rb_ePulsarError_ReadError = rb_define_class_under(rb_ePulsarError, "ReadError", rb_ePulsarError); + rb_ePulsarError_AuthenticationError = rb_define_class_under(rb_ePulsarError, "AuthenticationError", rb_ePulsarError); + rb_ePulsarError_AuthorizationError = rb_define_class_under(rb_ePulsarError, "AuthorizationError", rb_ePulsarError); + rb_ePulsarError_ErrorGettingAuthenticationData = rb_define_class_under(rb_ePulsarError, "ErrorGettingAuthenticationData", rb_ePulsarError); + rb_ePulsarError_BrokerMetadataError = rb_define_class_under(rb_ePulsarError, "BrokerMetadataError", rb_ePulsarError); + rb_ePulsarError_BrokerPersistenceError = rb_define_class_under(rb_ePulsarError, "BrokerPersistenceError", rb_ePulsarError); + rb_ePulsarError_ChecksumError = rb_define_class_under(rb_ePulsarError, "ChecksumError", rb_ePulsarError); + rb_ePulsarError_ConsumerBusy = rb_define_class_under(rb_ePulsarError, "ConsumerBusy", rb_ePulsarError); + rb_ePulsarError_NotConnected = rb_define_class_under(rb_ePulsarError, "NotConnected", rb_ePulsarError); + rb_ePulsarError_AlreadyClosed = rb_define_class_under(rb_ePulsarError, "AlreadyClosed", rb_ePulsarError); + rb_ePulsarError_InvalidMessage = rb_define_class_under(rb_ePulsarError, "InvalidMessage", rb_ePulsarError); + rb_ePulsarError_ConsumerNotInitialized = rb_define_class_under(rb_ePulsarError, "ConsumerNotInitialized", rb_ePulsarError); + rb_ePulsarError_ProducerNotInitialized = rb_define_class_under(rb_ePulsarError, "ProducerNotInitialized", rb_ePulsarError); + rb_ePulsarError_ProducerBusy = rb_define_class_under(rb_ePulsarError, "ProducerBusy", rb_ePulsarError); + rb_ePulsarError_TooManyLookupRequestException = rb_define_class_under(rb_ePulsarError, "TooManyLookupRequestException", rb_ePulsarError); + rb_ePulsarError_InvalidTopicName = rb_define_class_under(rb_ePulsarError, "InvalidTopicName", rb_ePulsarError); + rb_ePulsarError_InvalidUrl = rb_define_class_under(rb_ePulsarError, "InvalidUrl", rb_ePulsarError); + rb_ePulsarError_ServiceUnitNotReady = rb_define_class_under(rb_ePulsarError, "ServiceUnitNotReady", rb_ePulsarError); + rb_ePulsarError_OperationNotSupported = rb_define_class_under(rb_ePulsarError, "OperationNotSupported", rb_ePulsarError); + rb_ePulsarError_ProducerBlockedQuotaExceededError = rb_define_class_under(rb_ePulsarError, "ProducerBlockedQuotaExceededError", rb_ePulsarError); + rb_ePulsarError_ProducerBlockedQuotaExceededException = rb_define_class_under(rb_ePulsarError, "ProducerBlockedQuotaExceededException", rb_ePulsarError); + rb_ePulsarError_ProducerQueueIsFull = rb_define_class_under(rb_ePulsarError, "ProducerQueueIsFull", rb_ePulsarError); + rb_ePulsarError_MessageTooBig = rb_define_class_under(rb_ePulsarError, "MessageTooBig", rb_ePulsarError); + rb_ePulsarError_TopicNotFound = rb_define_class_under(rb_ePulsarError, "TopicNotFound", rb_ePulsarError); + rb_ePulsarError_SubscriptionNotFound = rb_define_class_under(rb_ePulsarError, "SubscriptionNotFound", rb_ePulsarError); + rb_ePulsarError_ConsumerNotFound = rb_define_class_under(rb_ePulsarError, "ConsumerNotFound", rb_ePulsarError); + rb_ePulsarError_UnsupportedVersionError = rb_define_class_under(rb_ePulsarError, "UnsupportedVersionError", rb_ePulsarError); + rb_ePulsarError_TopicTerminated = rb_define_class_under(rb_ePulsarError, "TopicTerminated", rb_ePulsarError); + rb_ePulsarError_CryptoError = rb_define_class_under(rb_ePulsarError, "CryptoError", rb_ePulsarError); + rb_ePulsarError_IncompatibleSchema = rb_define_class_under(rb_ePulsarError, "IncompatibleSchema", rb_ePulsarError); + rb_ePulsarError_ConsumerAssignError = rb_define_class_under(rb_ePulsarError, "ConsumerAssignError", rb_ePulsarError); + rb_ePulsarError_CumulativeAcknowledgementNotAllowedError = rb_define_class_under(rb_ePulsarError, "CumulativeAcknowledgementNotAllowedError", rb_ePulsarError); + rb_ePulsarError_TransactionCoordinatorNotFoundError = rb_define_class_under(rb_ePulsarError, "TransactionCoordinatorNotFoundError", rb_ePulsarError); + rb_ePulsarError_InvalidTxnStatusError = rb_define_class_under(rb_ePulsarError, "InvalidTxnStatusError", rb_ePulsarError); + rb_ePulsarError_NotAllowedError = rb_define_class_under(rb_ePulsarError, "NotAllowedError", rb_ePulsarError); +} + +void CheckResult(pulsar::Result res) { + if (res != pulsar::ResultOk) { + switch(res) { + case pulsar::ResultUnknownError: + throw Exception(rb_ePulsarError_UnknownError, "Unknown error happened on broker"); break; + case pulsar::ResultInvalidConfiguration: + throw Exception(rb_ePulsarError_InvalidConfiguration, "Invalid configuration"); break; + case pulsar::ResultTimeout: + throw Exception(rb_ePulsarError_Timeout, "Operation timed out"); break; + case pulsar::ResultLookupError: + throw Exception(rb_ePulsarError_LookupError, "Broker lookup failed"); break; + case pulsar::ResultConnectError: + throw Exception(rb_ePulsarError_ConnectError, "Failed to connect to broker"); break; + case pulsar::ResultReadError: + throw Exception(rb_ePulsarError_ReadError, "Failed to read from socket"); break; + case pulsar::ResultAuthenticationError: + throw Exception(rb_ePulsarError_AuthenticationError, "Authentication failed on broker"); break; + case pulsar::ResultAuthorizationError: + throw Exception(rb_ePulsarError_AuthorizationError, "Client is not authorized to create producer/consumer"); break; + case pulsar::ResultErrorGettingAuthenticationData: + throw Exception(rb_ePulsarError_ErrorGettingAuthenticationData, "Client cannot find authorization data"); break; + case pulsar::ResultBrokerMetadataError: + throw Exception(rb_ePulsarError_BrokerMetadataError, "Broker failed in updating metadata"); break; + case pulsar::ResultBrokerPersistenceError: + throw Exception(rb_ePulsarError_BrokerPersistenceError, "Broker failed to persist entry"); break; + case pulsar::ResultChecksumError: + throw Exception(rb_ePulsarError_ChecksumError, "Corrupt message checksum failure"); break; + case pulsar::ResultConsumerBusy: + throw Exception(rb_ePulsarError_ConsumerBusy, "Exclusive consumer is already connected"); break; + case pulsar::ResultNotConnected: + throw Exception(rb_ePulsarError_NotConnected, "Producer/Consumer is not currently connected to broker"); break; + case pulsar::ResultAlreadyClosed: + throw Exception(rb_ePulsarError_AlreadyClosed, "Producer/Consumer is already closed and not accepting any operation"); break; + case pulsar::ResultInvalidMessage: + throw Exception(rb_ePulsarError_InvalidMessage, "Error in publishing an already used message"); break; + case pulsar::ResultConsumerNotInitialized: + throw Exception(rb_ePulsarError_ConsumerNotInitialized, "Consumer is not initialized"); break; + case pulsar::ResultProducerNotInitialized: + throw Exception(rb_ePulsarError_ProducerNotInitialized, "Producer is not initialized"); break; + case pulsar::ResultProducerBusy: + throw Exception(rb_ePulsarError_ProducerBusy, "Producer with same name is already connected"); break; + case pulsar::ResultTooManyLookupRequestException: + throw Exception(rb_ePulsarError_TooManyLookupRequestException, "Too Many concurrent LookupRequest"); break; + case pulsar::ResultInvalidTopicName: + throw Exception(rb_ePulsarError_InvalidTopicName, "Invalid topic name"); break; + case pulsar::ResultInvalidUrl: + throw Exception(rb_ePulsarError_InvalidUrl, "Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)"); break; + case pulsar::ResultServiceUnitNotReady: + throw Exception(rb_ePulsarError_ServiceUnitNotReady, "Service Unit unloaded between client did lookup and producer/consumer got created"); break; + case pulsar::ResultOperationNotSupported: + throw Exception(rb_ePulsarError_OperationNotSupported, "Operation not supported"); break; + case pulsar::ResultProducerBlockedQuotaExceededError: + throw Exception(rb_ePulsarError_ProducerBlockedQuotaExceededError, "Producer is blocked"); break; + case pulsar::ResultProducerBlockedQuotaExceededException: + throw Exception(rb_ePulsarError_ProducerBlockedQuotaExceededException, "Producer is getting exception"); break; + case pulsar::ResultProducerQueueIsFull: + throw Exception(rb_ePulsarError_ProducerQueueIsFull, "Producer queue is full"); break; + case pulsar::ResultMessageTooBig: + throw Exception(rb_ePulsarError_MessageTooBig, "Trying to send a messages exceeding the max size"); break; + case pulsar::ResultTopicNotFound: + throw Exception(rb_ePulsarError_TopicNotFound, "Topic not found"); break; + case pulsar::ResultSubscriptionNotFound: + throw Exception(rb_ePulsarError_SubscriptionNotFound, "Subscription not found"); break; + case pulsar::ResultConsumerNotFound: + throw Exception(rb_ePulsarError_ConsumerNotFound, "Consumer not found"); break; + case pulsar::ResultUnsupportedVersionError: + throw Exception(rb_ePulsarError_UnsupportedVersionError, "Error when an older client/version doesn't support a required feature"); break; + case pulsar::ResultTopicTerminated: + throw Exception(rb_ePulsarError_TopicTerminated, "Topic was already terminated"); break; + case pulsar::ResultCryptoError: + throw Exception(rb_ePulsarError_CryptoError, "Error when crypto operation fails"); break; + case pulsar::ResultIncompatibleSchema: + throw Exception(rb_ePulsarError_IncompatibleSchema, "Specified schema is incompatible with the topic's schema"); break; + case pulsar::ResultConsumerAssignError: + throw Exception(rb_ePulsarError_ConsumerAssignError, "Error when a new consumer connected but can't assign messages to this consumer"); break; + case pulsar::ResultCumulativeAcknowledgementNotAllowedError: + throw Exception(rb_ePulsarError_CumulativeAcknowledgementNotAllowedError, "Not allowed to call cumulativeAcknowledgement in Shared and Key_Shared subscription mode"); break; + case pulsar::ResultTransactionCoordinatorNotFoundError: + throw Exception(rb_ePulsarError_TransactionCoordinatorNotFoundError, "Transaction coordinator not found"); break; + case pulsar::ResultInvalidTxnStatusError: + throw Exception(rb_ePulsarError_InvalidTxnStatusError, "Invalid txn status error"); break; + case pulsar::ResultNotAllowedError: + throw Exception(rb_ePulsarError_NotAllowedError, "Not allowed"); break; + default: + throw Exception(rb_ePulsarError, "unexpected pulsar exception: %d", res); + } + } +} diff --git a/ext/bindings/util.hpp b/ext/bindings/util.hpp new file mode 100644 index 0000000..c3eaf7b --- /dev/null +++ b/ext/bindings/util.hpp @@ -0,0 +1,13 @@ +#ifndef __PULSAR_RUBY_CLIENT_UTIL_HPP +#define __PULSAR_RUBY_CLIENT_UTIL_HPP + +#include "rice/Module.hpp" +#include + +using namespace pulsar; + +void CheckResult(Result res); + +void bind_errors(Rice::Module& module); + +#endif diff --git a/ext/bindings/vector.cpp b/ext/bindings/vector.cpp new file mode 100644 index 0000000..12b99cc --- /dev/null +++ b/ext/bindings/vector.cpp @@ -0,0 +1,16 @@ +#include "rice/Array.hpp" + +#include "vector.hpp" + +template<> +std::vector from_ruby>(Rice::Object o) +{ + Rice::Array a(o); + std::vector v; + Rice::Array::iterator it = a.begin(); + Rice::Array::iterator end = a.end(); + for(; it != end; ++it) { + v.push_back(it->to_s().str()); + } + return v; +} diff --git a/ext/bindings/vector.hpp b/ext/bindings/vector.hpp new file mode 100644 index 0000000..43a853e --- /dev/null +++ b/ext/bindings/vector.hpp @@ -0,0 +1,10 @@ +#ifndef __PULSAR_RUBY_CLIENT_VECTOR_HPP +#define __PULSAR_RUBY_CLIENT_VECTOR_HPP + +#include "rice/Object.hpp" +#include + +template<> +std::vector from_ruby>(Rice::Object o); + +#endif diff --git a/lib/pulsar/client.rb b/lib/pulsar/client.rb new file mode 100644 index 0000000..431df43 --- /dev/null +++ b/lib/pulsar/client.rb @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/client/version' +require 'pulsar/bindings' +require 'pulsar/client_configuration' +require 'pulsar/consumer' +require 'pulsar/consumer_configuration' +require 'pulsar/producer' + +module Pulsar + class Client + module RubySideTweaks + def initialize(service_url, config=nil) + config = Pulsar::ClientConfiguration.from(config) + super(service_url, config) + end + + def create_producer(topic, config=nil) + config ||= Pulsar::ProducerConfiguration.new + super(topic, config) + end + + def subscribe(topic, subscription_name, config={}) + unless config.is_a?(Pulsar::ConsumerConfiguration) + config = Pulsar::ConsumerConfiguration.new(config) + end + super(Array(topic), subscription_name, config) + end + end + + prepend RubySideTweaks + + def self.sufficient_environment?(config={}) + config[:broker_uri] || ENV['PULSAR_BROKER_URI'] + end + + def self.from_environment(config={}) + config = Pulsar::ClientConfiguration.from_environment(config) + broker_uri = config[:broker_uri] || ENV['PULSAR_BROKER_URI'] + new(broker_uri, config) + end + end +end diff --git a/lib/pulsar/client/version.rb b/lib/pulsar/client/version.rb new file mode 100644 index 0000000..3b45810 --- /dev/null +++ b/lib/pulsar/client/version.rb @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Pulsar + class Client + VERSION = "2.6.1-beta.2" + end +end diff --git a/lib/pulsar/client_configuration.rb b/lib/pulsar/client_configuration.rb new file mode 100644 index 0000000..9466ee1 --- /dev/null +++ b/lib/pulsar/client_configuration.rb @@ -0,0 +1,152 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/bindings' + +module Pulsar + class ClientConfiguration + AUTH_TOKEN_PLUGIN_NAME = 'org.apache.pulsar.client.impl.auth.AuthenticationToken'.freeze + CLIENT_CONF_KEY_VALUE_SEPARATOR = ': ' + FILE_URL_PREFIX = 'file://'.freeze + + def self.from(config) + case config + when self then config # already a config object + when nil then self.new # empty (all defaults) config object + when Hash then self.new(config) # config object from hash + else raise ArgumentError + end + end + + # Creates a client configuration from a custom and environment configuration + # (the environment configuration defaults to OS's ENV) + def self.from_environment(config={}, environment=ENV.to_h) + environment_config = {} + if environment.has_key?('PULSAR_CERT_PATH') + environment_config[:use_tls] = true + environment_config[:tls_allow_insecure_connection] = false + environment_config[:tls_validate_hostname] = false + environment_config[:tls_trust_certs_file_path] = environment['PULSAR_CERT_PATH'] + end + if environment.has_key?('PULSAR_AUTH_TOKEN') + environment_config[:authentication_token] = environment['PULSAR_AUTH_TOKEN'] + end + if environment.has_key?('PULSAR_CLIENT_CONF') + environment_config.merge!(read_from_client_conf(environment['PULSAR_CLIENT_CONF'])) + end + self.from(environment_config.merge(config)) + end + + # Load configuration from PULSAR_CLIENT_CONF + # refer to (see https://github.com/apache/pulsar/blob/master/conf/client.conf) + def self.read_from_client_conf(pulsar_client_conf_file) + client_config = {} + # Read the client config as a Ruby Hashmap + pulsar_config = read_config(pulsar_client_conf_file) + # Attempt to read as many configuration as possible from the client configuration + + # If a TLS certificate had been given, use it + if pulsar_config.has_key? 'tlsTrustCertsFilePath' + client_config[:use_tls] = true + client_config[:tls_trust_certs_file_path] = pulsar_config['tlsTrustCertsFilePath'] + end + # If 'TLS enable hostname verification' is false, then switch it off in config + if (pulsar_config.has_key? 'tlsEnableHostnameVerification') && + pulsar_config['tlsEnableHostnameVerification'].eql?('false') + client_config[:tls_validate_hostname] = false + end + # If 'TLS allow insecure connection' is false, then switch it off in config + if (pulsar_config.has_key? 'tlsAllowInsecureConnection') && + pulsar_config['tlsAllowInsecureConnection'].eql?('false') + client_config[:tls_allow_insecure_connection] = false + end + # If token-based authentication is used, load the token + # Currently only loading from files is supported + if (pulsar_config['authPlugin'].eql? AUTH_TOKEN_PLUGIN_NAME) && + pulsar_config['authParams'].start_with?(FILE_URL_PREFIX) + # read the first line from the token file + token_file = pulsar_config['authParams'].gsub(FILE_URL_PREFIX, '') + begin + token_file_lines = File.readlines(token_file) + # store the token to the config + client_config[:authentication_token] = token_file_lines[0] + rescue Errno::ENOENT => e + warn("Could not load token file '#{token_file}'. Exception was #{e}.") + end + end + # If we have a broker service URI, use it + client_config[:broker_uri] = pulsar_config['brokerServiceUrl'] if pulsar_config.has_key? 'brokerServiceUrl' + client_config + end + + # read Pulsar client configuration as a Hashmap from a client configuration file + def self.read_config(pulsar_config_file) + result = [] + begin + config_file_content = File.readlines(pulsar_config_file) + result = config_file_content.map do |line| + key_value = line.split(CLIENT_CONF_KEY_VALUE_SEPARATOR, 2) + [key_value[0], key_value[1].strip] + end + rescue Errno::ENOENT => e + warn("Could not load client config file '#{pulsar_config_file}'. Exception was #{e}.") + end + return result.to_h + end + + module RubySideTweaks + def initialize(config={}) + super() + # store client configuration inside class + @config = config + populate(config) + end + end + + prepend RubySideTweaks + + # serve the client configuration to peer objects + # (eg. we need to configure the 'broker URI' in a different way) + def [](key) + @config[key] + end + + def populate(config={}) + populate_one(config, :authentication_token) + populate_one(config, :operation_timeout_seconds) + populate_one(config, :io_threads) + populate_one(config, :message_listener_threads) + populate_one(config, :concurrent_lookup_requests) + populate_one(config, :log_conf_file_path) + populate_one(config, :silent_logging) + populate_one(config, :use_tls) + populate_one(config, :tls_trust_certs_file_path) + populate_one(config, :tls_allow_insecure_connection) + populate_one(config, :tls_validate_hostname) + end + + def populate_one(config, key) + if config.key?(key) + self.send(:"#{key}=", config[key]) + elsif config.key?(key.to_s) + self.send(:"#{key}=", config[key.to_s]) + end + end + end +end diff --git a/lib/pulsar/consumer.rb b/lib/pulsar/consumer.rb new file mode 100644 index 0000000..e938444 --- /dev/null +++ b/lib/pulsar/consumer.rb @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/bindings' + +module Pulsar + class Consumer + class ListenerToken + def initialize + @active = true + end + + def finish + @active = false + end + + def active? + @active + end + end + + def listen + listener = ListenerToken.new + while listener.active? + msg = receive + yield msg.data, msg.message_id, lambda { listener.finish } + acknowledge(msg) + end + end + + def listen_in_thread + Thread.new { listen {|*args| yield *args }} + end + end +end diff --git a/lib/pulsar/consumer_configuration.rb b/lib/pulsar/consumer_configuration.rb new file mode 100644 index 0000000..801e181 --- /dev/null +++ b/lib/pulsar/consumer_configuration.rb @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/client/version' +require 'pulsar/bindings' +require 'pulsar/client_configuration' +require 'pulsar/consumer' +require 'pulsar/producer' + +module Pulsar + class ConsumerConfiguration + # aligns with the pulsar::ConsumerType enum in the C++ library + CONSUMER_TYPES = { + :exclusive => Pulsar::ConsumerType::Exclusive, + :shared => Pulsar::ConsumerType::Shared, + :failover => Pulsar::ConsumerType::Failover, + :key_shared => Pulsar::ConsumerType::KeyShared + } + + # aligns with the pulsar::InitialPosition enum in the C++ library + INITIAL_POSITIONS = { + :latest => Pulsar::InitialPosition::Latest, + :earliest => Pulsar::InitialPosition::Earliest + } + + module RubySideTweaks + def initialize(config={}) + super() + self.consumer_type = config[:consumer_type] if config.has_key?(:consumer_type) + end + + def consumer_type + enum_value = super + CONSUMER_TYPES.invert[enum_value] + end + + def consumer_type=(type) + unless type.is_a?(Pulsar::ConsumerType) + type = CONSUMER_TYPES[type] + unless type + raise ArgumentError, "unrecognized consumer_type" + end + end + super(type) + end + + def subscription_initial_position + enum_value = super + INITIAL_POSITIONS.invert[enum_value] + end + + def subscription_initial_position=(type) + unless type.is_a?(Pulsar::InitialPosition) + type = INITIAL_POSITIONS[type] + unless type + raise ArgumentError, "unrecognized subscription_initial_position" + end + end + super(type) + end + end + + prepend RubySideTweaks + end +end diff --git a/lib/pulsar/producer.rb b/lib/pulsar/producer.rb new file mode 100644 index 0000000..9bfad1b --- /dev/null +++ b/lib/pulsar/producer.rb @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require 'pulsar/bindings' + +module Pulsar + class Producer + module RubySideTweaks + def send(message, **opts) + unless message.is_a?(Pulsar::Message) + message = Pulsar::Message.new(message, opts) + else + warn "Ignoring options (#{opts.keys.join(", ")}) since message is already a Pulsar::Message" unless opts.empty? + end + + super(message) + end + end + + prepend RubySideTweaks + end +end diff --git a/pulsar-client.gemspec b/pulsar-client.gemspec new file mode 100644 index 0000000..2d387df --- /dev/null +++ b/pulsar-client.gemspec @@ -0,0 +1,31 @@ +lib = File.expand_path("../lib", __FILE__) +$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) +require "pulsar/client/version" + +Gem::Specification.new do |spec| + spec.name = "pulsar-client" + spec.version = Pulsar::Client::VERSION + spec.date = "2019-10-03" + spec.summary = "Apache Pulsar Ruby Client" + spec.description = "Wraps the Apache Pulsar C++ Client with Ruby bindings." + spec.authors = ["Jacob Fugal"] + spec.email = ["lukfugl@gmail.com"] + # once merged upstream to apache, we'll rename the homepage + spec.homepage = "https://github.com/instructure/pulsar-client-ruby" + spec.license = "Apache-2.0" + + spec.files = `git ls-files -z`.split("\x0").reject do |f| + f.match(%r{^(test|spec|features)/}) + end + spec.bindir = "exe" + spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } + + spec.extensions = ["ext/bindings/extconf.rb"] + + spec.add_development_dependency "bundler", "~> 1.16" + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "rspec", "~> 3.0" + + spec.add_dependency "rake-compiler", "~> 1.0" + spec.add_dependency "rice", "~> 2.1" +end diff --git a/spec/pulsar/client_configuration_spec.rb b/spec/pulsar/client_configuration_spec.rb new file mode 100644 index 0000000..3d12b1f --- /dev/null +++ b/spec/pulsar/client_configuration_spec.rb @@ -0,0 +1,194 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +require 'tempfile' + +RSpec.describe Pulsar::ClientConfiguration do + it 'use authentication token from environment' do + test_env = { + 'PULSAR_AUTH_TOKEN' => 'example.token.abcdef123' + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq('example.token.abcdef123') + end + + it 'use client cert from environment' do + test_env = { + 'PULSAR_CERT_PATH' => '/path/to/cert.pem' + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:use_tls]).to eq(true) + expect(config[:tls_allow_insecure_connection]).to eq(false) + expect(config[:tls_validate_hostname]).to eq(false) + expect(config[:tls_trust_certs_file_path]).to eq('/path/to/cert.pem') + end + + it 'do not use authentication token when plugin is not set' do + test_token = create_temp_config( + 'example.token.abcdef123' + ) + test_config = create_temp_config( + "authParams: file://#{test_token.path}" + ) + begin + test_env = { + 'PULSAR_CLIENT_CONF' => test_config.path + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq(nil) + ensure + test_config.unlink + test_token.unlink + end + end + + it 'overwrite token with client configuration' do + test_token = create_temp_config( + 'example.token.abcdef123.from.file' + ) + test_config = create_temp_config( + "authPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken\n"\ + "authParams: file://#{test_token.path}" + ) + begin + test_env = { + 'PULSAR_AUTH_TOKEN' => 'example.token.abcdef123.from.env.shall.be.overwritten', + 'PULSAR_CLIENT_CONF' => test_config.path + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq('example.token.abcdef123.from.file') + ensure + test_config.unlink + test_token.unlink + end + end + + it 'overwrite certificate with client configuration' do + test_config = create_temp_config( + "tlsTrustCertsFilePath: /cert/file/from/client/config/ca.pem\n" + ) + begin + test_env = { + 'PULSAR_CERT_PATH' => '/path/to/a/cert/that/shall/not/be/used', + 'PULSAR_CLIENT_CONF' => test_config.path + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:tls_trust_certs_file_path]).to eq('/cert/file/from/client/config/ca.pem') + ensure + test_config.unlink + end + end + + it 'load all supported configuration from configuration file' do + test_token = create_temp_config( + 'example.token.abcdef123.from.file' + ) + test_config = create_temp_config( + "authPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken\n"\ + "authParams: file://#{test_token.path}\n"\ + "brokerServiceUrl: pulsar+ssl://test.broker.uri:6651\n"\ + "tlsTrustCertsFilePath: /test/cert/file/ca.pem\n"\ + "tlsAllowInsecureConnection: false\n"\ + "tlsEnableHostnameVerification: false\n" + ) + begin + test_env = { + 'PULSAR_CLIENT_CONF' => test_config.path + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq('example.token.abcdef123.from.file') + expect(config[:broker_uri]).to eq('pulsar+ssl://test.broker.uri:6651') + expect(config[:tls_allow_insecure_connection]).to eq(false) + expect(config[:tls_validate_hostname]).to eq(false) + expect(config[:tls_trust_certs_file_path]).to eq('/test/cert/file/ca.pem') + expect(config[:use_tls]).to eq(true) + ensure + test_config.unlink + test_token.unlink + end + end + + it 'handle when configuration file is not found without exceptions' do + test_env = { + 'PULSAR_CLIENT_CONF' => '/this/file/does/not/exists' + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq(nil) + expect(config[:broker_uri]).to eq(nil) + expect(config[:tls_allow_insecure_connection]).to eq(nil) + expect(config[:tls_validate_hostname]).to eq(nil) + expect(config[:tls_trust_certs_file_path]).to eq(nil) + expect(config[:use_tls]).to eq(nil) + end + + it 'handle when token file is not found without exceptions' do + test_config = create_temp_config( + "authPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken\n"\ + "authParams: file:///missing/token/file.txt" + ) + begin + test_env = { + 'PULSAR_CLIENT_CONF' => test_config.path + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq(nil) + ensure + test_config.unlink + end + end + + it 'do not load auth params with incorrect prefix' do + # the authParams has a missing schema in this config + test_config = create_temp_config( + 'authPlugin: org.apache.pulsar.client.impl.auth.AuthenticationToken\n'\ + 'authParams: /missing/token/file.txt' + ) + begin + test_env = { + 'PULSAR_CLIENT_CONF' => test_config.path + } + config = Pulsar::ClientConfiguration.from_environment({}, test_env) + expect(config[:authentication_token]).to eq(nil) + ensure + test_config.unlink + end + end + + describe "silent logging" do + it "is off by default" do + expect(Pulsar::ClientConfiguration.new.silent_logging?).to eq(false) + end + + it "can be enabled" do + expect(Pulsar::ClientConfiguration.from({silent_logging: true}).silent_logging?).to eq(true) + end + + it "raises when set twice" do + expect { + Pulsar::ClientConfiguration.from({silent_logging: true}).silent_logging = false + }.to raise_error(ArgumentError, /silent_logging can only be set once/) + end + end +end + +def create_temp_config(content) + file = Tempfile.new('pulsar-ruby-client-configuration-test') + file.write(content) + file.close + file +end diff --git a/spec/pulsar/client_spec.rb b/spec/pulsar/client_spec.rb new file mode 100644 index 0000000..f15fed6 --- /dev/null +++ b/spec/pulsar/client_spec.rb @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +RSpec.describe Pulsar::Client do + it "has a version number" do + expect(Pulsar::Client::VERSION).not_to be nil + end + + context 'with pulsar' do + let(:broker_uri) { ENV['PULSAR_BROKER_URI'] } + let(:namespace) { ENV['PULSAR_CLIENT_RUBY_TEST_NAMESPACE'].to_s.sub(%r{^[-a-z]:/+}, '') } + let(:configured?) { broker_uri && !namespace.empty? } + let(:client) { Pulsar::Client.from_environment(broker_uri: broker_uri, silent_logging: !!ENV['PULSAR_CLIENT_SILENT_LOGGING']) } + let(:topic) { "non-persistent://#{namespace}/test#{sprintf "%06d", rand(1_000_000)}" } + let(:producer) { client.create_producer(topic) } + let(:subscription_name) { "#{topic}-consumer" } + let(:timeout_ms) { 10_000 } + + before(:each) do + skip('Live Pulsar tests not configured: Set PULSAR_CLIENT_RUBY_TEST_NAMESPACE to enable') unless configured? + end + + after(:each) do + # Close any producers/consumers to avoid test pollution. + client.close + end + + it "can consume a single topic" do + consumer = client.subscribe(topic, subscription_name) + t = Thread.new { consumer.receive(timeout_ms) } + client.create_producer(topic).send("single") + message = t.join.value + + expect(message.data).to eq("single") + expect(message.topic).to eq(topic) + end + + it "can consume multiple topics" do + topics = [topic, "#{topic}.2"] + consumer = client.subscribe(topics, subscription_name) + t = Thread.new { topics.map { consumer.receive(timeout_ms).data } } + topics.each.with_index do |t, i| + client.create_producer(t).send("#{t} #{i}") + end + expect(t.join.value).to eq(topics.map.with_index { |t, i| "#{t} #{i}" }) + end + + it "errors with zero topics" do + expect { + client.subscribe([], subscription_name) + }.to raise_error(ArgumentError, /at least one topic/) + end + end +end diff --git a/spec/pulsar/consumer_configuration_spec.rb b/spec/pulsar/consumer_configuration_spec.rb new file mode 100644 index 0000000..5a8e9da --- /dev/null +++ b/spec/pulsar/consumer_configuration_spec.rb @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +RSpec.describe Pulsar::ConsumerConfiguration do + it 'can have subscription state modulated with enum values' do + config = Pulsar::ConsumerConfiguration.new({}) + config.subscription_initial_position = :earliest + expect(config.subscription_initial_position).to eq(:earliest) + end +end \ No newline at end of file diff --git a/spec/pulsar/ext/bindings.cpp b/spec/pulsar/ext/bindings.cpp new file mode 100644 index 0000000..1749778 --- /dev/null +++ b/spec/pulsar/ext/bindings.cpp @@ -0,0 +1,139 @@ +#include "rice/Module.hpp" +#include "../../../ext/bindings/util.hpp" + +using namespace Rice; + +extern "C" + +void Init_bindings(); +void check_result_Ok(); +void check_result_UnknownError(); +void check_result_InvalidConfiguration(); +void check_result_Timeout(); +void check_result_LookupError(); +void check_result_ConnectError(); +void check_result_ReadError(); +void check_result_AuthenticationError(); +void check_result_AuthorizationError(); +void check_result_ErrorGettingAuthenticationData(); +void check_result_BrokerMetadataError(); +void check_result_BrokerPersistenceError(); +void check_result_ChecksumError(); +void check_result_ConsumerBusy(); +void check_result_NotConnected(); +void check_result_AlreadyClosed(); +void check_result_InvalidMessage(); +void check_result_ConsumerNotInitialized(); +void check_result_ProducerNotInitialized(); +void check_result_ProducerBusy(); +void check_result_TooManyLookupRequestException(); +void check_result_InvalidTopicName(); +void check_result_InvalidUrl(); +void check_result_ServiceUnitNotReady(); +void check_result_OperationNotSupported(); +void check_result_ProducerBlockedQuotaExceededError(); +void check_result_ProducerBlockedQuotaExceededException(); +void check_result_ProducerQueueIsFull(); +void check_result_MessageTooBig(); +void check_result_TopicNotFound(); +void check_result_SubscriptionNotFound(); +void check_result_ConsumerNotFound(); +void check_result_UnsupportedVersionError(); +void check_result_TopicTerminated(); +void check_result_CryptoError(); +void check_result_IncompatibleSchema(); +void check_result_ConsumerAssignError(); +void check_result_CumulativeAcknowledgementNotAllowedError(); +void check_result_TransactionCoordinatorNotFoundError(); +void check_result_InvalidTxnStatusError(); +void check_result_NotAllowedError(); + +void Init_bindings() +{ + Module rb_mPulsarTestExt = define_module("PulsarTestExt") + .define_singleton_method("check_result_Ok", &check_result_Ok) + .define_singleton_method("check_result_UnknownError", &check_result_UnknownError) + .define_singleton_method("check_result_InvalidConfiguration", &check_result_InvalidConfiguration) + .define_singleton_method("check_result_Timeout", &check_result_Timeout) + .define_singleton_method("check_result_LookupError", &check_result_LookupError) + .define_singleton_method("check_result_ConnectError", &check_result_ConnectError) + .define_singleton_method("check_result_ReadError", &check_result_ReadError) + .define_singleton_method("check_result_AuthenticationError", &check_result_AuthenticationError) + .define_singleton_method("check_result_AuthorizationError", &check_result_AuthorizationError) + .define_singleton_method("check_result_ErrorGettingAuthenticationData", &check_result_ErrorGettingAuthenticationData) + .define_singleton_method("check_result_BrokerMetadataError", &check_result_BrokerMetadataError) + .define_singleton_method("check_result_BrokerPersistenceError", &check_result_BrokerPersistenceError) + .define_singleton_method("check_result_ChecksumError", &check_result_ChecksumError) + .define_singleton_method("check_result_ConsumerBusy", &check_result_ConsumerBusy) + .define_singleton_method("check_result_NotConnected", &check_result_NotConnected) + .define_singleton_method("check_result_AlreadyClosed", &check_result_AlreadyClosed) + .define_singleton_method("check_result_InvalidMessage", &check_result_InvalidMessage) + .define_singleton_method("check_result_ConsumerNotInitialized", &check_result_ConsumerNotInitialized) + .define_singleton_method("check_result_ProducerNotInitialized", &check_result_ProducerNotInitialized) + .define_singleton_method("check_result_ProducerBusy", &check_result_ProducerBusy) + .define_singleton_method("check_result_TooManyLookupRequestException", &check_result_TooManyLookupRequestException) + .define_singleton_method("check_result_InvalidTopicName", &check_result_InvalidTopicName) + .define_singleton_method("check_result_InvalidUrl", &check_result_InvalidUrl) + .define_singleton_method("check_result_ServiceUnitNotReady", &check_result_ServiceUnitNotReady) + .define_singleton_method("check_result_OperationNotSupported", &check_result_OperationNotSupported) + .define_singleton_method("check_result_ProducerBlockedQuotaExceededError", &check_result_ProducerBlockedQuotaExceededError) + .define_singleton_method("check_result_ProducerBlockedQuotaExceededException", &check_result_ProducerBlockedQuotaExceededException) + .define_singleton_method("check_result_ProducerQueueIsFull", &check_result_ProducerQueueIsFull) + .define_singleton_method("check_result_MessageTooBig", &check_result_MessageTooBig) + .define_singleton_method("check_result_TopicNotFound", &check_result_TopicNotFound) + .define_singleton_method("check_result_SubscriptionNotFound", &check_result_SubscriptionNotFound) + .define_singleton_method("check_result_ConsumerNotFound", &check_result_ConsumerNotFound) + .define_singleton_method("check_result_UnsupportedVersionError", &check_result_UnsupportedVersionError) + .define_singleton_method("check_result_TopicTerminated", &check_result_TopicTerminated) + .define_singleton_method("check_result_CryptoError", &check_result_CryptoError) + .define_singleton_method("check_result_IncompatibleSchema", &check_result_IncompatibleSchema) + .define_singleton_method("check_result_ConsumerAssignError", &check_result_ConsumerAssignError) + .define_singleton_method("check_result_CumulativeAcknowledgementNotAllowedError", &check_result_CumulativeAcknowledgementNotAllowedError) + .define_singleton_method("check_result_TransactionCoordinatorNotFoundError", &check_result_TransactionCoordinatorNotFoundError) + .define_singleton_method("check_result_InvalidTxnStatusError", &check_result_InvalidTxnStatusError) + .define_singleton_method("check_result_NotAllowedError", &check_result_NotAllowedError) + ; + +} + +void check_result_Ok() { CheckResult(pulsar::ResultOk); } +void check_result_UnknownError() { CheckResult(pulsar::ResultUnknownError); } +void check_result_InvalidConfiguration() { CheckResult(pulsar::ResultInvalidConfiguration); } +void check_result_Timeout() { CheckResult(pulsar::ResultTimeout); } +void check_result_LookupError() { CheckResult(pulsar::ResultLookupError); } +void check_result_ConnectError() { CheckResult(pulsar::ResultConnectError); } +void check_result_ReadError() { CheckResult(pulsar::ResultReadError); } +void check_result_AuthenticationError() { CheckResult(pulsar::ResultAuthenticationError); } +void check_result_AuthorizationError() { CheckResult(pulsar::ResultAuthorizationError); } +void check_result_ErrorGettingAuthenticationData() { CheckResult(pulsar::ResultErrorGettingAuthenticationData); } +void check_result_BrokerMetadataError() { CheckResult(pulsar::ResultBrokerMetadataError); } +void check_result_BrokerPersistenceError() { CheckResult(pulsar::ResultBrokerPersistenceError); } +void check_result_ChecksumError() { CheckResult(pulsar::ResultChecksumError); } +void check_result_ConsumerBusy() { CheckResult(pulsar::ResultConsumerBusy); } +void check_result_NotConnected() { CheckResult(pulsar::ResultNotConnected); } +void check_result_AlreadyClosed() { CheckResult(pulsar::ResultAlreadyClosed); } +void check_result_InvalidMessage() { CheckResult(pulsar::ResultInvalidMessage); } +void check_result_ConsumerNotInitialized() { CheckResult(pulsar::ResultConsumerNotInitialized); } +void check_result_ProducerNotInitialized() { CheckResult(pulsar::ResultProducerNotInitialized); } +void check_result_ProducerBusy() { CheckResult(pulsar::ResultProducerBusy); } +void check_result_TooManyLookupRequestException() { CheckResult(pulsar::ResultTooManyLookupRequestException); } +void check_result_InvalidTopicName() { CheckResult(pulsar::ResultInvalidTopicName); } +void check_result_InvalidUrl() { CheckResult(pulsar::ResultInvalidUrl); } +void check_result_ServiceUnitNotReady() { CheckResult(pulsar::ResultServiceUnitNotReady); } +void check_result_OperationNotSupported() { CheckResult(pulsar::ResultOperationNotSupported); } +void check_result_ProducerBlockedQuotaExceededError() { CheckResult(pulsar::ResultProducerBlockedQuotaExceededError); } +void check_result_ProducerBlockedQuotaExceededException() { CheckResult(pulsar::ResultProducerBlockedQuotaExceededException); } +void check_result_ProducerQueueIsFull() { CheckResult(pulsar::ResultProducerQueueIsFull); } +void check_result_MessageTooBig() { CheckResult(pulsar::ResultMessageTooBig); } +void check_result_TopicNotFound() { CheckResult(pulsar::ResultTopicNotFound); } +void check_result_SubscriptionNotFound() { CheckResult(pulsar::ResultSubscriptionNotFound); } +void check_result_ConsumerNotFound() { CheckResult(pulsar::ResultConsumerNotFound); } +void check_result_UnsupportedVersionError() { CheckResult(pulsar::ResultUnsupportedVersionError); } +void check_result_TopicTerminated() { CheckResult(pulsar::ResultTopicTerminated); } +void check_result_CryptoError() { CheckResult(pulsar::ResultCryptoError); } +void check_result_IncompatibleSchema() { CheckResult(pulsar::ResultIncompatibleSchema); } +void check_result_ConsumerAssignError() { CheckResult(pulsar::ResultConsumerAssignError); } +void check_result_CumulativeAcknowledgementNotAllowedError() { CheckResult(pulsar::ResultCumulativeAcknowledgementNotAllowedError); } +void check_result_TransactionCoordinatorNotFoundError() { CheckResult(pulsar::ResultTransactionCoordinatorNotFoundError); } +void check_result_InvalidTxnStatusError() { CheckResult(pulsar::ResultInvalidTxnStatusError); } +void check_result_NotAllowedError() { CheckResult(pulsar::ResultNotAllowedError); } diff --git a/spec/pulsar/ext/extconf.rb b/spec/pulsar/ext/extconf.rb new file mode 100644 index 0000000..0aa5276 --- /dev/null +++ b/spec/pulsar/ext/extconf.rb @@ -0,0 +1,3 @@ +require 'mkmf-rice' +$CXXFLAGS += ' -std=c++11 ' +create_makefile('bindings') diff --git a/spec/pulsar/ext_spec.rb b/spec/pulsar/ext_spec.rb new file mode 100644 index 0000000..051d7b2 --- /dev/null +++ b/spec/pulsar/ext_spec.rb @@ -0,0 +1,166 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "rbconfig" + +RSpec.describe "pulsar ext" do + RUBY = RbConfig::CONFIG['RUBY_INSTALL_NAME'] + EXT_DIR = File.join(__dir__, "ext") + + before(:all) do + Dir.chdir(EXT_DIR) do + system("#{RUBY} ./extconf.rb") + system("make clean all") + end + require_relative "#{File.join(".", "ext", "bindings")}" + end + + describe "errors" do + it "inherits from Pulsar::Error" do + expect(Pulsar::Error::UnknownError.new.is_a?(Pulsar::Error)).to be true + end + end + + describe "check_result" do + it "doesn't raise for ok" do + expect { PulsarTestExt.check_result_Ok }.not_to raise_exception + end + + it "raises UnknownError" do + expect { PulsarTestExt.check_result_UnknownError }.to raise_exception(Pulsar::Error::UnknownError) + end + it "raises InvalidConfiguration" do + expect { PulsarTestExt.check_result_InvalidConfiguration }.to raise_exception(Pulsar::Error::InvalidConfiguration) + end + it "raises Timeout" do + expect { PulsarTestExt.check_result_Timeout }.to raise_exception(Pulsar::Error::Timeout) + end + it "raises LookupError" do + expect { PulsarTestExt.check_result_LookupError }.to raise_exception(Pulsar::Error::LookupError) + end + it "raises ConnectError" do + expect { PulsarTestExt.check_result_ConnectError }.to raise_exception(Pulsar::Error::ConnectError) + end + it "raises ReadError" do + expect { PulsarTestExt.check_result_ReadError }.to raise_exception(Pulsar::Error::ReadError) + end + it "raises AuthenticationError" do + expect { PulsarTestExt.check_result_AuthenticationError }.to raise_exception(Pulsar::Error::AuthenticationError) + end + it "raises AuthorizationError" do + expect { PulsarTestExt.check_result_AuthorizationError }.to raise_exception(Pulsar::Error::AuthorizationError) + end + it "raises ErrorGettingAuthenticationData" do + expect { PulsarTestExt.check_result_ErrorGettingAuthenticationData }.to raise_exception(Pulsar::Error::ErrorGettingAuthenticationData) + end + it "raises BrokerMetadataError" do + expect { PulsarTestExt.check_result_BrokerMetadataError }.to raise_exception(Pulsar::Error::BrokerMetadataError) + end + it "raises BrokerPersistenceError" do + expect { PulsarTestExt.check_result_BrokerPersistenceError }.to raise_exception(Pulsar::Error::BrokerPersistenceError) + end + it "raises ChecksumError" do + expect { PulsarTestExt.check_result_ChecksumError }.to raise_exception(Pulsar::Error::ChecksumError) + end + it "raises ConsumerBusy" do + expect { PulsarTestExt.check_result_ConsumerBusy }.to raise_exception(Pulsar::Error::ConsumerBusy) + end + it "raises NotConnected" do + expect { PulsarTestExt.check_result_NotConnected }.to raise_exception(Pulsar::Error::NotConnected) + end + it "raises AlreadyClosed" do + expect { PulsarTestExt.check_result_AlreadyClosed }.to raise_exception(Pulsar::Error::AlreadyClosed) + end + it "raises InvalidMessage" do + expect { PulsarTestExt.check_result_InvalidMessage }.to raise_exception(Pulsar::Error::InvalidMessage) + end + it "raises ConsumerNotInitialized" do + expect { PulsarTestExt.check_result_ConsumerNotInitialized }.to raise_exception(Pulsar::Error::ConsumerNotInitialized) + end + it "raises ProducerNotInitialized" do + expect { PulsarTestExt.check_result_ProducerNotInitialized }.to raise_exception(Pulsar::Error::ProducerNotInitialized) + end + it "raises ProducerBusy" do + expect { PulsarTestExt.check_result_ProducerBusy }.to raise_exception(Pulsar::Error::ProducerBusy) + end + it "raises TooManyLookupRequestException" do + expect { PulsarTestExt.check_result_TooManyLookupRequestException }.to raise_exception(Pulsar::Error::TooManyLookupRequestException) + end + it "raises InvalidTopicName" do + expect { PulsarTestExt.check_result_InvalidTopicName }.to raise_exception(Pulsar::Error::InvalidTopicName) + end + it "raises InvalidUrl" do + expect { PulsarTestExt.check_result_InvalidUrl }.to raise_exception(Pulsar::Error::InvalidUrl) + end + it "raises ServiceUnitNotReady" do + expect { PulsarTestExt.check_result_ServiceUnitNotReady }.to raise_exception(Pulsar::Error::ServiceUnitNotReady) + end + it "raises OperationNotSupported" do + expect { PulsarTestExt.check_result_OperationNotSupported }.to raise_exception(Pulsar::Error::OperationNotSupported) + end + it "raises ProducerBlockedQuotaExceededError" do + expect { PulsarTestExt.check_result_ProducerBlockedQuotaExceededError }.to raise_exception(Pulsar::Error::ProducerBlockedQuotaExceededError) + end + it "raises ProducerBlockedQuotaExceededException" do + expect { PulsarTestExt.check_result_ProducerBlockedQuotaExceededException }.to raise_exception(Pulsar::Error::ProducerBlockedQuotaExceededException) + end + it "raises ProducerQueueIsFull" do + expect { PulsarTestExt.check_result_ProducerQueueIsFull }.to raise_exception(Pulsar::Error::ProducerQueueIsFull) + end + it "raises MessageTooBig" do + expect { PulsarTestExt.check_result_MessageTooBig }.to raise_exception(Pulsar::Error::MessageTooBig) + end + it "raises TopicNotFound" do + expect { PulsarTestExt.check_result_TopicNotFound }.to raise_exception(Pulsar::Error::TopicNotFound) + end + it "raises SubscriptionNotFound" do + expect { PulsarTestExt.check_result_SubscriptionNotFound }.to raise_exception(Pulsar::Error::SubscriptionNotFound) + end + it "raises ConsumerNotFound" do + expect { PulsarTestExt.check_result_ConsumerNotFound }.to raise_exception(Pulsar::Error::ConsumerNotFound) + end + it "raises UnsupportedVersionError" do + expect { PulsarTestExt.check_result_UnsupportedVersionError }.to raise_exception(Pulsar::Error::UnsupportedVersionError) + end + it "raises TopicTerminated" do + expect { PulsarTestExt.check_result_TopicTerminated }.to raise_exception(Pulsar::Error::TopicTerminated) + end + it "raises CryptoError" do + expect { PulsarTestExt.check_result_CryptoError }.to raise_exception(Pulsar::Error::CryptoError) + end + it "raises IncompatibleSchema" do + expect { PulsarTestExt.check_result_IncompatibleSchema }.to raise_exception(Pulsar::Error::IncompatibleSchema) + end + it "raises ConsumerAssignError" do + expect { PulsarTestExt.check_result_ConsumerAssignError }.to raise_exception(Pulsar::Error::ConsumerAssignError) + end + it "raises CumulativeAcknowledgementNotAllowedError" do + expect { PulsarTestExt.check_result_CumulativeAcknowledgementNotAllowedError }.to raise_exception(Pulsar::Error::CumulativeAcknowledgementNotAllowedError) + end + it "raises TransactionCoordinatorNotFoundError" do + expect { PulsarTestExt.check_result_TransactionCoordinatorNotFoundError }.to raise_exception(Pulsar::Error::TransactionCoordinatorNotFoundError) + end + it "raises InvalidTxnStatusError" do + expect { PulsarTestExt.check_result_InvalidTxnStatusError }.to raise_exception(Pulsar::Error::InvalidTxnStatusError) + end + it "raises NotAllowedError" do + expect { PulsarTestExt.check_result_NotAllowedError }.to raise_exception(Pulsar::Error::NotAllowedError) + end + end +end diff --git a/spec/pulsar/message_spec.rb b/spec/pulsar/message_spec.rb new file mode 100644 index 0000000..1cbae86 --- /dev/null +++ b/spec/pulsar/message_spec.rb @@ -0,0 +1,134 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +RSpec.describe Pulsar::Message do + describe "::new" do + it "takes a string" do + m = described_class.new("payload") + expect(m.data).to eq("payload") + end + + describe "topic" do + it "is nil when built rather than received" do + m = described_class.new("payload") + expect(m.topic).to be_nil + end + end + + describe "properties" do + it "takes properties" do + m = described_class.new("payload", properties: {"a" => "1", "b" => "2"}) + expect(m.data).to eq("payload") + expect(m.properties).to eq({"a" => "1", "b" => "2"}) + end + + it "stringifies non-string properties" do + m = described_class.new("payload", properties: { + "a" => 1, + :b => [2], + }) + expect(m.data).to eq("payload") + expect(m.properties).to eq({"a" => "1", "b" => "[2]"}) + end + + it "takes a lot of properties" do + m = described_class.new("payload", properties: { + "a" => 1, + "b" => [2], + "c" => ("c" * 100000), + "license" => File.read(File.expand_path("../../LICENSE", __dir__)), + }) + expect(m.data).to eq("payload") + expect(m.properties["license"]).to match(/Apache License/) + expect(m.properties["license"]).to match(/limitations under the License/) + expect(m.properties["c"]).to match(/^c{100000}$/) + expect(m.properties.values_at("a", "b")).to eq(["1", "[2]"]) + end + + it "accepts nil properties" do + m = described_class.new("payload", properties: nil) + expect(m.properties).to eq({}) + end + end + + describe "partition_key" do + it "defaults to blank string" do + m = described_class.new("payload") + expect(m.partition_key).to eq("") + end + + it "accepts partition key" do + m = described_class.new("payload", partition_key: "foo") + expect(m.partition_key).to eq("foo") + end + + it "accepts nil key" do + m = described_class.new("payload", partition_key: nil) + expect(m.partition_key).to eq("") + end + + it "stringifies partition key" do + m = described_class.new("payload", partition_key: :bar) + expect(m.partition_key).to eq("bar") + end + end + + describe "ordering_key" do + it "defaults to blank string" do + m = described_class.new("payload") + expect(m.ordering_key).to eq("") + end + + it "accepts ordering key" do + m = described_class.new("payload", ordering_key: "foo") + expect(m.ordering_key).to eq("foo") + end + + it "accepts nil ordering key" do + m = described_class.new("payload", ordering_key: nil) + expect(m.ordering_key).to eq("") + end + + it "stringifies ordering key" do + m = described_class.new("payload", ordering_key: ["o"]) + expect(m.ordering_key).to eq(%(["o"])) + end + end + + describe "errors" do + it "rejects second arg that is not a hash" do + expect do + described_class.new("payload", [1]) + end.to raise_exception(TypeError) + end + + it "rejects unknown named arguments" do + expect do + described_class.new("payload", properties: {}, foo: "x") + end.to raise_exception(ArgumentError, /Unknown keyword argument: foo/) + end + + it "rejects properties that are not a hash" do + expect do + described_class.new("payload", properties: []) + end.to raise_exception(TypeError) + end + end + end +end diff --git a/spec/pulsar/producer_spec.rb b/spec/pulsar/producer_spec.rb new file mode 100644 index 0000000..ae7bd43 --- /dev/null +++ b/spec/pulsar/producer_spec.rb @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +RSpec.describe Pulsar::Producer do + describe "side tweaks" do + describe "#send" do + class ProducerTest + def send(message) + message + end + prepend Pulsar::Producer::RubySideTweaks + end + + context "message" do + it "passes Message through" do + m = Pulsar::Message.new("payload") + expect(Pulsar::Message).not_to receive(:new) + expect(ProducerTest.new.send(m).data).to eq("payload") + end + + it "warns about unused arguments" do + m = Pulsar::Message.new("payload") + test = ProducerTest.new + expect(test).to receive(:warn).with(matching(/Ignoring options \((properties|foo), (properties|foo)\)/)) + expect(test.send(m, properties: {k: "v"}, foo: "bar").data).to eq("payload") + end + end + + it "creates Message from single string arg" do + m = ProducerTest.new.send("payload") + expect(m.data).to eq("payload") + end + + describe "options" do + subject { + ProducerTest.new.send( + "payload", + properties: {"k" => "v"}, + partition_key: "foo", + ordering_key: "mine", + ) + } + + it "sets data" do + expect(subject.data).to eq("payload") + end + + it "sets properties" do + expect(subject.properties).to eq({"k" => "v"}) + end + + it "sets partition_key" do + expect(subject.partition_key).to eq("foo") + end + + it "sets ordering_key" do + expect(subject.ordering_key).to eq("mine") + end + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..a9ba201 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +require "pulsar/client" + +RSpec.configure do |config| + # Enable flags like --only-failures and --next-failure + config.example_status_persistence_file_path = ".rspec_status" + + # Disable RSpec exposing methods globally on `Module` and `main` + config.disable_monkey_patching! + + config.expect_with :rspec do |c| + c.syntax = :expect + end +end