diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1da52c42..347d1b34 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,12 @@ jobs: with: fetch-depth: 0 + - name: Set up Java + uses: actions/setup-java@v2 + with: + java-version: 17 + distribution: "temurin" + - name: Setup Ruby ${{ matrix.version }} uses: ruby/setup-ruby@v1 with: @@ -57,7 +63,7 @@ jobs: - name: SonarQube Scan (Push) if: matrix.version == '3.2.2' && github.event_name == 'push' - uses: SonarSource/sonarcloud-github-action@v1.9 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -69,13 +75,14 @@ jobs: - name: SonarQube Scan (Pull Request) if: matrix.version == '3.2.2' && github.event_name == 'pull_request' - uses: SonarSource/sonarcloud-github-action@v1.9 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: projectBaseDir: . args: > + -Dsonar.java.source=17 -Dsonar.host.url=${{ secrets.SONARQUBE_HOST }} -Dsonar.projectVersion=${{ env.VERSION }} -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} diff --git a/CHANGES.txt b/CHANGES.txt index f899ea72..a9e4fc42 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,8 @@ CHANGES +8.10.0 (Nov 28, 2025) +- Replaced socketry gem used in streaming feature with built-in socket lib. + 8.9.0 (Oct 8, 2025) - Added new configuration for Fallback Treatments, which allows setting a treatment value and optional config to be returned in place of "control", either globally or by flag. Read more in our docs. diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 022894d6..08ac28ee 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -1,7 +1,9 @@ # frozen_string_literal: false -require 'socketry' +require 'socket' +require 'openssl' require 'uri' +require 'timeout' module SplitIoClient module SSE @@ -36,12 +38,15 @@ def initialize(config, def close(status = nil) unless connected? - @config.logger.error('SSEClient already disconected.') if @config.debug_enabled + @config.logger.debug('SSEClient already disconected.') return end + @config.logger.debug("Closing SSEClient socket") @connected.make_false - @socket&.close + @socket.sync_close = true if @socket.is_a? OpenSSL::SSL::SSLSocket + @socket.close + @config.logger.debug("SSEClient socket state #{@socket.state}") if @socket.is_a? OpenSSL::SSL::SSLSocket push_status(status) rescue StandardError => e @config.logger.error("SSEClient close Error: #{e.inspect}") @@ -55,7 +60,6 @@ def start(url) @uri = URI(url) latch = Concurrent::CountDownLatch.new(1) - connect_thread(latch) return false unless latch.wait(CONNECT_TIMEOUT) @@ -74,42 +78,73 @@ def connected? def connect_thread(latch) @config.threads[:connect_stream] = Thread.new do - @config.logger.info('Starting connect_stream thread ...') if @config.debug_enabled + @config.logger.info('Starting connect_stream thread ...') new_status = connect_stream(latch) push_status(new_status) - @config.logger.info('connect_stream thread finished.') if @config.debug_enabled + @config.logger.info('connect_stream thread finished.') end end def connect_stream(latch) return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch) - while connected? || @first_event.value - begin - partial_data = @socket.readpartial(10_000, timeout: @read_timeout) - - read_first_event(partial_data, latch) - - raise 'eof exception' if partial_data == :eof - rescue Errno::EBADF, IOError => e - @config.logger.error(e.inspect) if @config.debug_enabled - return nil - rescue StandardError => e - return nil if ENV['SPLITCLIENT_ENV'] == 'test' - - @config.logger.error("Error reading partial data: #{e.inspect}") if @config.debug_enabled - return Constants::PUSH_RETRYABLE_ERROR + begin + if IO.select([@socket], nil, nil, @read_timeout) + begin + partial_data = @socket.readpartial(10_000) + read_first_event(partial_data, latch) + + raise 'eof exception' if partial_data == :eof + rescue IO::WaitReadable => e + @config.logger.debug("SSE client IO::WaitReadable transient error: #{e.inspect}") + IO.select([@socket], nil, nil, @read_timeout) + retry + rescue Errno::EAGAIN => e + @config.logger.debug("SSE client transient error: #{e.inspect}") + IO.select([@socket], nil, nil, @read_timeout) + retry + rescue Errno::ETIMEDOUT => e + @config.logger.error("SSE read operation timed out!: #{e.inspect}") + return Constants::PUSH_RETRYABLE_ERROR + rescue EOFError => e + puts "SSE read operation EOF Exception!: #{e.inspect}" + @config.logger.error("SSE read operation EOF Exception!: #{e.inspect}") + raise 'eof exception' + rescue Errno::EBADF, IOError => e + @config.logger.error("SSE read operation EBADF or IOError: #{e.inspect}") + return Constants::PUSH_RETRYABLE_ERROR + rescue StandardError => e + @config.logger.error("SSE read operation StandardError: #{e.inspect}") + return nil if ENV['SPLITCLIENT_ENV'] == 'test' + + @config.logger.error("Error reading partial data: #{e.inspect}") + return Constants::PUSH_RETRYABLE_ERROR + end + else + @config.logger.error("SSE read operation timed out, no data available.") + return Constants::PUSH_RETRYABLE_ERROR + end + rescue Errno::EBADF + @config.logger.debug("SSE socket is not connected (Errno::EBADF)") + break + rescue RuntimeError + raise 'eof exception' + rescue Exception => e + @config.logger.debug("SSE socket is not connected: #{e.inspect}") + break end process_data(partial_data) end + @config.logger.info("SSE read operation exited: #{connected?}") + nil end def socket_write(latch) @first_event.make_true @socket = socket_connect - @socket.write(build_request(@uri)) + @socket.puts(build_request(@uri)) true rescue StandardError => e @config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}") @@ -130,6 +165,7 @@ def read_first_event(data, latch) if response_code == OK_CODE && !error_event @connected.make_true + @config.logger.debug("SSE client first event Connected is true") @telemetry_runtime_producer.record_streaming_event(Telemetry::Domain::Constants::SSE_CONNECTION_ESTABLISHED, nil) push_status(Constants::PUSH_CONNECTED) end @@ -138,15 +174,37 @@ def read_first_event(data, latch) end def socket_connect - return Socketry::SSL::Socket.connect(@uri.host, @uri.port) if @uri.scheme.casecmp('https').zero? + tcp_socket = TCPSocket.new(@uri.host, @uri.port) + if @uri.scheme.casecmp('https').zero? + begin + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) + ssl_socket.hostname = @uri.host + + begin + ssl_socket.connect_nonblock + rescue IO::WaitReadable + IO.select([ssl_socket]) + retry + rescue IO::WaitWritable + IO.select(nil, [ssl_socket]) + retry + end + return ssl_socket + + rescue Exception => e + @config.logger.error("socket connect error: #{e.inspect}") + return nil + end + end - Socketry::TCP::Socket.connect(@uri.host, @uri.port) + tcp_socket end def process_data(partial_data) + @config.logger.debug("Event partial data: #{partial_data}") return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE - @config.logger.debug("Event partial data: #{partial_data}") if @config.debug_enabled events = @event_parser.parse(partial_data) events.each { |event| process_event(event) } rescue StandardError => e @@ -162,7 +220,7 @@ def build_request(uri) req << "SplitSDKMachineName: #{@config.machine_name}\r\n" req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil? req << "Cache-Control: no-cache\r\n\r\n" - @config.logger.debug("Request info: #{req}") if @config.debug_enabled + @config.logger.debug("Request info: #{req}") req end diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index 1afbdd0a..d45f095e 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '8.9.0' + VERSION = '8.10.0' end diff --git a/sonar-project.properties b/sonar-project.properties index d98bd22f..ae9b0918 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -2,4 +2,5 @@ sonar.projectKey=ruby-client sonar.projectKey=ruby-client sonar.sources=lib sonar.tests=spec +sonar.java.source=17 sonar.ruby.coverage.reportPaths=coverage/.resultset.sonarqube.json diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index 1c199b09..95e0a1f2 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -2,6 +2,7 @@ require 'spec_helper' require 'http_server_mock' +require 'rspec/mocks' describe SplitIoClient::SSE::EventSource::Client do subject { SplitIoClient::SSE::EventSource::Client } @@ -221,6 +222,36 @@ end end + it 'client timeout and reconnect' do + stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=-1&rbSince=-1') + .with(headers: { 'Authorization' => 'Bearer client-spec-key' }) + .to_return(status: 200, body: '{"ff":{"d":[],"s":-1,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}') + stub_request(:get, 'https://sdk.split.io/api/splitChanges?s=1.3&since=5564531221&rbSince=-1') + .with(headers: { 'Authorization' => 'Bearer client-spec-key' }) + .to_return(status: 200, body: '{"ff":{"d":[],"s":5564531221,"t":5564531221}, "rbs":{"d":[],"s":-1,"t":-1}}') + + mock_server do |server| + start_workers + server.setup_response('/') do |_, res| + send_stream_content(res, event_split_update) + end + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue, read_timeout: 0.1) + connected = sse_client.start(server.base_uri) + sleep 1 + expect(connected).to eq(true) + expect(sse_client.connected?).to eq(true) + expect(push_status_queue.pop(true)).to eq(SplitIoClient::Constants::PUSH_CONNECTED) + sleep 3 + expect(log.string).to include 'SSE read operation timed out, no data available' + expect(sse_client.connected?).to eq(true) + sse_client.close + expect(sse_client.connected?).to eq(false) + + stop_workers + end + end + it 'first event - when server return 400' do mock_server do |server| server.setup_response('/') do |_, res| @@ -236,6 +267,95 @@ stop_workers end end + + it 'test exceptions' do + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_split_update) + end + start_workers + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) + + sse_client.instance_variable_set(:@uri, URI(server.base_uri)) + latch = Concurrent::CountDownLatch.new(1) + + allow(sse_client).to receive(:read_first_event).and_raise(Errno::ETIMEDOUT) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation timed out!' + + allow(sse_client).to receive(:read_first_event).and_raise(EOFError) + expect { sse_client.send(:connect_stream, latch) }.to raise_error(RuntimeError) + expect(log.string).to include 'SSE read operation EOF Exception!' + + allow(sse_client).to receive(:read_first_event).and_raise(Errno::EBADF) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation EBADF or IOError' + + allow(sse_client).to receive(:read_first_event).and_raise(IOError) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation EBADF or IOError' + + allow(sse_client).to receive(:read_first_event).and_raise(StandardError) + sse_client.send(:connect_stream, latch) + expect(log.string).to include 'SSE read operation StandardError:' + + stop_workers + end + end + + it 'test retry with EAGAIN exceptions' do + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_occupancy) + end + start_workers + + sse_client = subject.new(config, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) + + sse_client.instance_variable_set(:@uri, URI(server.base_uri)) + latch = Concurrent::CountDownLatch.new(1) + + allow(sse_client).to receive(:read_first_event).and_raise(Errno::EAGAIN) + sleep(1) + thr1 = Thread.new do + sse_client.send(:connect_stream, latch) + end + sleep(1) + allow(sse_client).to receive(:read_first_event).and_return(true) + expect(log.string).to include 'SSE client transient error' + + stop_workers + end + end + + it 'test retry with IO::WaitReadable exceptions' do + log2 = StringIO.new + config2 = SplitIoClient::SplitConfig.new(logger: Logger.new(log2)) + + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_occupancy) + end + start_workers + + sse_client2 = subject.new(config2, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) + + sse_client2.instance_variable_set(:@uri, URI(server.base_uri)) + latch = Concurrent::CountDownLatch.new(1) + + allow(sse_client2).to receive(:read_first_event).and_raise(IO::EWOULDBLOCKWaitReadable) + sleep(1) + thr2 = Thread.new do + sse_client2.send(:connect_stream, latch) + end + sleep(1) + allow(sse_client2).to receive(:read_first_event).and_return(true) + expect(log2.string).to include 'SSE client IO::WaitReadable transient error' + + stop_workers + end + end end private diff --git a/spec/sse/sse_handler_spec.rb b/spec/sse/sse_handler_spec.rb index 099f7932..96fb6141 100644 --- a/spec/sse/sse_handler_spec.rb +++ b/spec/sse/sse_handler_spec.rb @@ -48,7 +48,6 @@ config.streaming_service_url = server.base_uri sse_handler = subject.new(config, splits_worker, segments_worker, sse_client) - connected = sse_handler.start('token-test', 'channel-test') expect(connected).to eq(true) expect(sse_handler.connected?).to eq(true) diff --git a/splitclient-rb.gemspec b/splitclient-rb.gemspec index 3c818df9..742c6d3f 100644 --- a/splitclient-rb.gemspec +++ b/splitclient-rb.gemspec @@ -59,6 +59,5 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'lru_redux', '~> 1.1' spec.add_runtime_dependency 'net-http-persistent', '>= 2.9', '< 5.0' spec.add_runtime_dependency 'redis', '>= 4.0.0', '< 6.0' - spec.add_runtime_dependency 'socketry', '>= 0.4', '< 1.0' spec.add_runtime_dependency 'thread_safe', '~> 0.3' end