Skip to content

Commit b890600

Browse files
authored
Fix JSON paths (#30243)
1 parent 977caf6 commit b890600

File tree

25 files changed

+1072
-273
lines changed

25 files changed

+1072
-273
lines changed

ydb/core/formats/arrow/accessor/common/binary_json_value_view.cpp

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,34 @@
33
#include <ydb/library/actors/core/log.h>
44
#include <yql/essentials/types/binary_json/read.h>
55
#include <util/generic/ylimits.h>
6+
#include <util/string/cast.h>
67

78
#include <limits>
89
#include <cmath>
910

1011
namespace NKikimr::NArrow::NAccessor {
1112

12-
namespace {
13-
14-
std::optional<std::string> JsonNumberToString(double val) {
15-
if (std::isnan(val)) {
13+
std::optional<TString> TBinaryJsonValueView::JsonNumberToString(double jsonNumber) {
14+
if (std::isnan(jsonNumber)) {
1615
return std::nullopt;
1716
}
1817

1918
double integerPart;
20-
double fractionPart = std::modf(val, &integerPart);
19+
double fractionPart = std::modf(jsonNumber, &integerPart);
2120
if (!(fractionPart == 0.0)) {
22-
return std::to_string(val);
21+
return ::ToString(jsonNumber);
2322
}
2423

2524
static constexpr double minD = static_cast<double>(std::numeric_limits<i64>::min());
2625
static constexpr double maxD = MaxFloor<i64>();
2726

28-
if (minD <= val && val <= maxD) {
29-
return std::to_string(static_cast<i64>(val));
27+
if (minD <= jsonNumber && jsonNumber <= maxD) {
28+
return ::ToString(static_cast<i64>(jsonNumber));
3029
}
3130

32-
return std::to_string(val);
31+
return ::ToString(jsonNumber);
3332
}
3433

35-
} // namespace
36-
3734
TBinaryJsonValueView::TBinaryJsonValueView(const TStringBuf& rawValue)
3835
: RawValue(rawValue) {
3936
if (!RawValue.empty()) {

ydb/core/formats/arrow/accessor/common/binary_json_value_view.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class TBinaryJsonValueView {
1212

1313
std::optional<TStringBuf> GetScalarOptional() const;
1414

15+
static std::optional<TString> JsonNumberToString(double jsonNumber);
16+
1517
private:
1618
TStringBuf RawValue;
1719
mutable TString ScalarHolder;

ydb/core/formats/arrow/accessor/composite_serial/accessor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ IChunkedArray::TLocalChunkedArrayAddress TDeserializeChunkedArray::DoGetLocalChu
1818
}
1919
if (!!Data) {
2020
auto result = Loader->ApplyConclusion(Data, GetRecordsCount());
21-
Y_ABORT_UNLESS(result, "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
21+
Y_ABORT_UNLESS(result.IsSuccess(), "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
2222
return TLocalChunkedArrayAddress(result.DetachResult(), 0, 0);
2323
} else {
2424
AFL_VERIFY(!!DataBuffer);
2525
auto result = Loader->ApplyConclusion(TString(DataBuffer.data(), DataBuffer.size()), GetRecordsCount());
26-
Y_ABORT_UNLESS(result, "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
26+
Y_ABORT_UNLESS(result.IsSuccess(), "Incorrect object for result request. internal path id: %s portion id: %" PRIu64 " error: %s ", InternalPathId.data(), PortionId, result.GetErrorMessage().data());
2727
return TLocalChunkedArrayAddress(result.DetachResult(), 0, 0);
2828
}
2929
}

ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp

Lines changed: 42 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22
#include "direct_builder.h"
33
#include "signals.h"
44

5+
#include <util/generic/overloaded.h>
56
#include <ydb/core/formats/arrow/accessor/composite_serial/accessor.h>
67
#include <ydb/core/formats/arrow/accessor/plain/constructor.h>
8+
#include <ydb/core/formats/arrow/accessor/sub_columns/json_value_path.h>
79
#include <ydb/core/formats/arrow/save_load/loader.h>
810
#include <ydb/core/formats/arrow/size_calcer.h>
911
#include <ydb/core/formats/arrow/splitter/simple.h>
1012

1113
#include <ydb/library/formats/arrow/protos/accessor.pb.h>
1214
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
1315

16+
#include <yql/essentials/minikql/jsonpath/parser/parser.h>
1417
#include <yql/essentials/types/binary_json/format.h>
1518
#include <yql/essentials/types/binary_json/write.h>
1619

@@ -113,103 +116,16 @@ TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& extern
113116
return result;
114117
}
115118

116-
class TJsonRestorer {
117-
private:
118-
NJson::TJsonValue Result;
119-
120-
public:
121-
bool IsNull() const {
122-
return !Result.IsDefined();
123-
}
124-
125-
TConclusion<NBinaryJson::TBinaryJson> Finish() {
126-
auto bJson = NBinaryJson::SerializeToBinaryJson(Result.GetStringRobust());
127-
if (const TString* val = std::get_if<TString>(&bJson)) {
128-
return TConclusionStatus::Fail(*val);
129-
} else if (const NBinaryJson::TBinaryJson* val = std::get_if<NBinaryJson::TBinaryJson>(&bJson)) {
130-
return std::move(*val);
131-
} else {
132-
return TConclusionStatus::Fail("undefined case for binary json construction");
133-
}
134-
}
135-
136-
void SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue) {
137-
ui32 start = 0;
138-
bool enqueue = false;
139-
bool wasEnqueue = false;
140-
NJson::TJsonValue* current = &Result;
141-
for (ui32 i = 0; i < path.size(); ++i) {
142-
if (path[i] == '\\') {
143-
++i;
144-
continue;
145-
}
146-
if (path[i] == '\'' || path[i] == '\"') {
147-
wasEnqueue = true;
148-
enqueue = !enqueue;
149-
continue;
150-
}
151-
if (enqueue) {
152-
continue;
153-
}
154-
if (path[i] == '.') {
155-
if (wasEnqueue) {
156-
AFL_VERIFY(i > start + 2);
157-
TStringBuf key(path.data() + start + 1, (i - 1) - start - 1);
158-
NJson::TJsonValue* currentNext = nullptr;
159-
if (current->GetValuePointer(key, &currentNext)) {
160-
current = currentNext;
161-
} else {
162-
current = &current->InsertValue(key, NJson::JSON_MAP);
163-
}
164-
} else {
165-
AFL_VERIFY(i > start);
166-
TStringBuf key(path.data() + start, i - start);
167-
NJson::TJsonValue* currentNext = nullptr;
168-
if (current->GetValuePointer(key, &currentNext)) {
169-
current = currentNext;
170-
} else {
171-
ui32 keyIndex;
172-
if (key.StartsWith("[") && key.EndsWith("]") && TryFromString<ui32>(key.data() + 1, key.size() - 2, keyIndex)) {
173-
AFL_VERIFY(!current->IsDefined() || current->IsArray() || (current->IsMap() && current->GetMapSafe().empty()));
174-
current->SetType(NJson::JSON_ARRAY);
175-
if (current->GetArraySafe().size() <= keyIndex) {
176-
current->GetArraySafe().resize(keyIndex + 1);
177-
}
178-
current = &current->GetArraySafe()[keyIndex];
179-
} else {
180-
AFL_VERIFY(!current->IsArray())("current_type", current->GetType())("current", current->GetStringRobust());
181-
current = &current->InsertValue(key, NJson::JSON_MAP);
182-
}
183-
}
184-
}
185-
wasEnqueue = false;
186-
start = i + 1;
187-
}
188-
}
189-
if (wasEnqueue) {
190-
AFL_VERIFY(path.size() > start + 2)("path", path)("start", start);
191-
TStringBuf key(path.data() + start + 1, (path.size() - 1) - start - 1);
192-
current->InsertValue(key, jsonValue);
193-
} else {
194-
AFL_VERIFY(path.size() >= start)("path", path)("start", start);
195-
TStringBuf key(path.data() + start, (path.size()) - start);
196-
ui32 keyIndex;
197-
if (key.StartsWith("[") && key.EndsWith("]") && TryFromString<ui32>(key.data() + 1, key.size() - 2, keyIndex)) {
198-
AFL_VERIFY(!current->IsDefined() || current->IsArray() || (current->IsMap() && current->GetMapSafe().empty()));
199-
current->SetType(NJson::JSON_ARRAY);
200-
201-
if (current->GetArraySafe().size() <= keyIndex) {
202-
current->GetArraySafe().resize(keyIndex + 1);
203-
}
204-
current->GetArraySafe()[keyIndex] = jsonValue;
205-
} else {
206-
AFL_VERIFY(!current->IsArray())("key", key)("current", current->GetStringRobust())("full", Result.GetStringRobust())(
207-
"current_type", current->GetType());
208-
current->InsertValue(key, jsonValue);
209-
}
210-
}
211-
}
212-
};
119+
TConclusion<NBinaryJson::TBinaryJson> ToBinaryJson(const TJsonRestorer& restorer) {
120+
return std::visit(TOverloaded{
121+
[](TString&& val) -> TConclusion<NBinaryJson::TBinaryJson> {
122+
return TConclusionStatus::Fail(std::move(val));
123+
},
124+
[](NBinaryJson::TBinaryJson&& val) -> TConclusion<NBinaryJson::TBinaryJson> {
125+
return std::move(val);
126+
}},
127+
NBinaryJson::SerializeToBinaryJson(restorer.GetResult().GetStringRobust()));
128+
}
213129

214130
std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnConstructionContext& context) const {
215131
auto it = BuildUnorderedIterator();
@@ -233,7 +149,7 @@ std::shared_ptr<arrow::Array> TSubColumnsArray::BuildBJsonArray(const TColumnCon
233149
if (value.IsNull()) {
234150
TStatusValidator::Validate(builder->AppendNull());
235151
} else {
236-
const TConclusion<NBinaryJson::TBinaryJson> bJson = value.Finish();
152+
const TConclusion<NBinaryJson::TBinaryJson> bJson = ToBinaryJson(value);
237153
NArrow::Append<arrow::BinaryType>(*builder, arrow::util::string_view(bJson->data(), bJson->size()));
238154
}
239155
};
@@ -268,4 +184,32 @@ IChunkedArray::TLocalDataAddress TSubColumnsArray::DoGetLocalData(
268184
return TLocalDataAddress(BuildBJsonArray(TColumnConstructionContext()), 0, 0);
269185
}
270186

187+
bool TJsonRestorer::IsNull() const {
188+
return !Result.IsDefined();
189+
}
190+
191+
const NJson::TJsonValue& TJsonRestorer::GetResult() const {
192+
return Result;
193+
}
194+
195+
void TJsonRestorer::SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue) {
196+
// Path may be empty (for backward compatibility), so make it $."" in this case
197+
auto splitResult = NSubColumns::SplitJsonPath(NSubColumns::ToJsonPath(path.empty() ? "\"\"" : path), NSubColumns::TJsonPathSplitSettings{.FillTypes = true});
198+
AFL_VERIFY(splitResult.IsSuccess())("error", splitResult.GetErrorMessage())("path", path);
199+
const auto [pathItems, pathTypes, _] = splitResult.DetachResult();
200+
AFL_VERIFY(pathItems.size() > 0);
201+
AFL_VERIFY(pathItems.size() == pathTypes.size());
202+
NJson::TJsonValue* current = &Result;
203+
for (decltype(pathItems)::size_type i = 0; i < pathItems.size() - 1; ++i) {
204+
AFL_VERIFY(pathTypes[i] == NYql::NJsonPath::EJsonPathItemType::MemberAccess);
205+
NJson::TJsonValue* currentNext = nullptr;
206+
if (current->GetValuePointer(pathItems[i], &currentNext)) {
207+
current = currentNext;
208+
} else {
209+
current = &current->InsertValue(pathItems[i], NJson::JSON_MAP);
210+
}
211+
}
212+
current->InsertValue(pathItems[pathItems.size() - 1], jsonValue);
213+
}
214+
271215
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/sub_columns/accessor.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include <ydb/core/formats/arrow/accessor/abstract/accessor.h>
99
#include <ydb/core/formats/arrow/accessor/common/chunk_data.h>
10+
#include <ydb/core/formats/arrow/accessor/sub_columns/json_value_path.h>
1011
#include <ydb/core/formats/arrow/arrow_filter.h>
1112
#include <ydb/core/formats/arrow/arrow_helpers.h>
1213
#include <ydb/core/formats/arrow/common/container.h>
@@ -111,13 +112,25 @@ class TSubColumnsArray: public IChunkedArray {
111112
return nullptr;
112113
}
113114

114-
std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
115+
TConclusion<std::shared_ptr<NSubColumns::TJsonPathAccessor>> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
115116
auto accResult = ColumnsData.GetPathAccessor(svPath);
116-
if (accResult) {
117+
if (accResult.IsFail() || accResult.GetResult()->IsValid()) {
117118
return accResult;
118119
}
119120
return OthersData.GetPathAccessor(svPath, recordsCount);
120121
}
121122
};
122123

124+
class TJsonRestorer {
125+
private:
126+
NJson::TJsonValue Result;
127+
128+
public:
129+
bool IsNull() const;
130+
131+
const NJson::TJsonValue& GetResult() const;
132+
133+
void SetValueByPath(const TString& path, const NJson::TJsonValue& jsonValue);
134+
};
135+
123136
} // namespace NKikimr::NArrow::NAccessor

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
2222
}
2323
records.DeleteFieldsByIndex(indexesToRemove);
2424
return TColumnsData(builder.Finish(), std::make_shared<TGeneralContainer>(std::move(records)));
25-
2625
} else {
2726
return TColumnsData(TDictStats::BuildEmpty(), std::make_shared<TGeneralContainer>(0));
2827
}

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_binary.h>
1010
#include <ydb/core/formats/arrow/accessor/common/binary_json_value_view.h>
1111
#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h>
12+
#include <ydb/core/formats/arrow/accessor/sub_columns/json_value_path.h>
1213

1314
namespace NKikimr::NArrow::NAccessor::NSubColumns {
1415

@@ -18,13 +19,13 @@ class TColumnsData {
1819
YDB_READONLY_DEF(std::shared_ptr<TGeneralContainer>, Records);
1920

2021
public:
21-
std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view path) const {
22-
auto idx = Stats.GetKeyIndexOptional(path);
23-
if (!idx) {
24-
return nullptr;
25-
} else {
26-
return Records->GetColumnVerified(*idx);
22+
TConclusion<std::shared_ptr<TJsonPathAccessor>> GetPathAccessor(const std::string_view path) const {
23+
auto jsonPathAccessorTrie = std::make_shared<NKikimr::NArrow::NAccessor::NSubColumns::TJsonPathAccessorTrie>();
24+
for (ui32 i = 0; i < Stats.GetColumnsCount(); ++i) {
25+
auto insertResult = jsonPathAccessorTrie->Insert(ToSubcolumnName(Stats.GetColumnName(i)), Records->GetColumnVerified(i));
26+
AFL_VERIFY(insertResult.IsSuccess())("error", insertResult.GetErrorMessage());
2727
}
28+
return jsonPathAccessorTrie->GetAccessor(path);
2829
}
2930

3031
NJson::TJsonValue DebugJson() const {

ydb/core/formats/arrow/accessor/sub_columns/data_extractor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ TConclusionStatus TJsonScanExtractor::DoAddDataToBuilders(const std::shared_ptr<
3232

3333
if (cursor.GetType() == NBinaryJson::EContainerType::Object) {
3434
iterators.push_back(std::make_unique<TKVExtractor>(cursor.GetObjectIterator(), TStringBuf(), FirstLevelOnly));
35-
} else if (cursor.GetType() == NBinaryJson::EContainerType::Array) {
36-
iterators.push_back(std::make_unique<TArrayExtractor>(cursor.GetArrayIterator(), TStringBuf(), FirstLevelOnly));
35+
} else {
36+
return TConclusionStatus::Fail("Only top-level objects are supported in JSON subcolumns");
3737
}
3838

3939
while (iterators.size()) {

ydb/core/formats/arrow/accessor/sub_columns/direct_builder.cpp

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "columns_storage.h"
33
#include "direct_builder.h"
44

5+
#include <util/string/escape.h>
56
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
67
#include <ydb/core/formats/arrow/accessor/sparsed/accessor.h>
78

@@ -121,19 +122,14 @@ TOthersData TDataBuilder::MergeOthers(const std::vector<TColumnElements*>& other
121122
}
122123

123124
std::string BuildString(const TStringBuf currentPrefix, const TStringBuf key) {
124-
if (key.find(".") != std::string::npos) {
125-
if (currentPrefix.size()) {
126-
return Sprintf("%.*s.\"%.*s\"", currentPrefix.size(), currentPrefix.data(), key.size(), key.data());
127-
} else {
128-
return Sprintf("\"%.*s\"", key.size(), key.data());
129-
}
130-
} else {
131-
if (currentPrefix.size()) {
132-
return Sprintf("%.*s.%.*s", currentPrefix.size(), currentPrefix.data(), key.size(), key.data());
133-
} else {
134-
return std::string(key.data(), key.size());
135-
}
125+
TStringBuilder builder;
126+
const auto escapedKey = QuoteJsonItem(key);
127+
if (currentPrefix.size()) {
128+
builder << currentPrefix << ".";
136129
}
130+
builder << escapedKey;
131+
132+
return builder;
137133
}
138134

139135
TStringBuf TDataBuilder::AddKeyOwn(const TStringBuf currentPrefix, std::string&& key) {

ydb/core/formats/arrow/accessor/sub_columns/json_extractors.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,14 @@ TConclusionStatus IJsonObjectExtractor::AddDataToBuilder(TDataBuilder& dataBuild
4747
auto container = value.GetContainer();
4848
if (FirstLevelOnly || container.GetType() == NBinaryJson::EContainerType::Array) {
4949
res = NBinaryJson::SerializeToBinaryJson(value);
50-
// TODO: add support for arrays if needed
51-
// } else if (container.GetType() == NBinaryJson::EContainerType::Array) {
52-
// iterators.emplace_back(std::make_unique<TArrayExtractor>(container.GetArrayIterator(), key));
53-
// addRes = false;
5450
} else if (container.GetType() == NBinaryJson::EContainerType::Object) {
55-
iterators.emplace_back(std::make_unique<TKVExtractor>(container.GetObjectIterator(), key));
56-
addRes = false;
51+
auto containerIt = container.GetObjectIterator();
52+
if (!containerIt.HasNext()) {
53+
res = NBinaryJson::SerializeToBinaryJson("{}");
54+
} else {
55+
iterators.emplace_back(std::make_unique<TKVExtractor>(containerIt, key));
56+
addRes = false;
57+
}
5758
} else {
5859
return TConclusionStatus::Fail("unexpected top value scalar in container iterator");
5960
}

0 commit comments

Comments
 (0)