@@ -67,6 +67,18 @@ Connection::~Connection() {
6767 }
6868}
6969
70+ Baton Connection::rdkafkaErrorToBaton (RdKafka::Error* error) {
71+ if ( NULL == error) {
72+ return Baton (RdKafka::ERR_NO_ERROR);
73+ }
74+ else {
75+ Baton result (error->code (), error->str (), error->is_fatal (),
76+ error->is_retriable (), error->txn_requires_abort ());
77+ delete error;
78+ return result;
79+ }
80+ }
81+
7082RdKafka::TopicPartition* Connection::GetPartition (std::string &topic) {
7183 return RdKafka::TopicPartition::create (topic, RdKafka::Topic::PARTITION_UA);
7284}
@@ -215,6 +227,25 @@ Baton Connection::GetMetadata(
215227 }
216228}
217229
230+ Baton Connection::SetSaslCredentials (
231+ std::string username, std::string password) {
232+ RdKafka::Error *error;
233+
234+ if (IsConnected ()) {
235+ scoped_shared_read_lock lock (m_connection_lock);
236+ if (IsConnected ()) {
237+ // Always send true - we
238+ error = m_client->sasl_set_credentials (username, password);
239+ } else {
240+ return Baton (RdKafka::ERR__STATE);
241+ }
242+ } else {
243+ return Baton (RdKafka::ERR__STATE);
244+ }
245+
246+ return rdkafkaErrorToBaton (error);
247+ }
248+
218249void Connection::ConfigureCallback (const std::string &string_key, const v8::Local<v8::Function> &cb, bool add) {
219250 if (string_key.compare (" event_cb" ) == 0 ) {
220251 if (add) {
@@ -337,6 +368,39 @@ NAN_METHOD(Connection::NodeQueryWatermarkOffsets) {
337368 info.GetReturnValue ().Set (Nan::Null ());
338369}
339370
371+ NAN_METHOD (Connection::NodeSetSaslCredentials) {
372+ if (!info[0 ]->IsString ()) {
373+ Nan::ThrowError (" 1st parameter must be a username string" );
374+ return ;
375+ }
376+
377+ if (!info[1 ]->IsString ()) {
378+ Nan::ThrowError (" 2nd parameter must be a password string" );
379+ return ;
380+ }
381+
382+ // Get string pointer for the username
383+ Nan::Utf8String usernameUTF8 (Nan::To<v8::String>(info[0 ]).ToLocalChecked ());
384+ // The first parameter is the username
385+ std::string username (*usernameUTF8);
386+
387+ // Get string pointer for the password
388+ Nan::Utf8String passwordUTF8 (Nan::To<v8::String>(info[1 ]).ToLocalChecked ());
389+ // The first parameter is the password
390+ std::string password (*passwordUTF8);
391+
392+ Connection* obj = ObjectWrap::Unwrap<Connection>(info.This ());
393+ Baton b = obj->SetSaslCredentials (username, password);
394+
395+ if (b.err () != RdKafka::ERR_NO_ERROR) {
396+ v8::Local<v8::Value> errorObject = b.ToObject ();
397+ return Nan::ThrowError (errorObject);
398+ }
399+
400+ info.GetReturnValue ().Set (Nan::Null ());
401+ }
402+
403+
340404// Node methods
341405NAN_METHOD (Connection::NodeConfigureCallbacks) {
342406 Nan::HandleScope scope;
0 commit comments