Skip to content

Commit

Permalink
introduce plan and matcher of row pattern recognition
Browse files Browse the repository at this point in the history
  • Loading branch information
Young-Leo committed Dec 13, 2024
1 parent 7e918c2 commit fda9edb
Show file tree
Hide file tree
Showing 62 changed files with 5,498 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern;

import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.ArrayView;

import org.apache.tsfile.read.common.block.TsBlock;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class LabelEvaluator {
private final long matchNumber;

private final int patternStart;

// inclusive - the first row of the search partition
private final int partitionStart;

// inclusive - the first row of the partition area available for pattern search.
// this area is the whole partition in case of MATCH_RECOGNIZE, and the area enclosed
// by the common base frame in case of pattern recognition in WINDOW clause.
private final int searchStart;

// exclusive - the first row after the the partition area available for pattern search.
// this area is the whole partition in case of MATCH_RECOGNIZE, and the area enclosed
// by the common base frame in case of pattern recognition in WINDOW clause.
private final int searchEnd;

private final List<Evaluation> evaluations;

// private final ProjectingPagesWindowIndex windowIndex;

public LabelEvaluator(
long matchNumber,
int patternStart,
int partitionStart,
int searchStart,
int searchEnd,
List<Evaluation> evaluations
// ProjectingPagesWindowIndex windowIndex
) {
this.matchNumber = matchNumber;
this.patternStart = patternStart;
this.partitionStart = partitionStart;
this.searchStart = searchStart;
this.searchEnd = searchEnd;
this.evaluations = requireNonNull(evaluations, "evaluations is null");
// this.windowIndex = requireNonNull(windowIndex, "windowIndex is null");
}

public int getInputLength() {
return searchEnd - patternStart;
}

public boolean isMatchingAtPartitionStart() {
return patternStart == partitionStart;
}

// evaluate the last label in matchedLabels. It has been tentatively appended to the match
public boolean evaluateLabel(ArrayView matchedLabels) {
int label = matchedLabels.get(matchedLabels.length() - 1);
Evaluation evaluation = evaluations.get(label);
return evaluation.test(
matchedLabels,
// aggregations,
partitionStart,
searchStart,
searchEnd,
patternStart,
matchNumber
// windowIndex
);
}

public static class Evaluation {
// compiled computation of label-defining boolean expression
// private final PageProjection projection;

// value accessors ordered as expected by the compiled projection
private final List<PhysicalValueAccessor> expectedLayout;

// precomputed `Block`s with null values for every `PhysicalValuePointer` (see
// MeasureComputation)
private final TsBlock[] nulls;

// mapping from int representation to label name
private final List<String> labelNames;

// private final ConnectorSession session;

public Evaluation(
// PageProjection projection,
List<PhysicalValueAccessor> expectedLayout, List<String> labelNames
// ConnectorSession session
) {
// this.projection = requireNonNull(projection, "projection is null");
this.expectedLayout = requireNonNull(expectedLayout, "expectedLayout is null");
this.nulls = precomputeNulls(expectedLayout);
this.labelNames = requireNonNull(labelNames, "labelNames is null");
// this.session = requireNonNull(session, "session is null");
}

public static TsBlock[] precomputeNulls(List<PhysicalValueAccessor> expectedLayout) {
TsBlock[] nulls = new TsBlock[expectedLayout.size()];
for (int i = 0; i < expectedLayout.size(); i++) {
PhysicalValueAccessor accessor = expectedLayout.get(i);
if (accessor instanceof PhysicalValuePointer) {
// nulls[i] = nativeValueToBlock(((PhysicalValuePointer) accessor).getType(),
// null);
// TODO:
nulls[i] = new TsBlock(1);
}
}
return nulls;
}

public List<PhysicalValueAccessor> getExpectedLayout() {
return expectedLayout;
}

public boolean test(
ArrayView matchedLabels,
// MatchAggregation[] aggregations,
int partitionStart,
int searchStart,
int searchEnd,
int patternStart,
long matchNumber
// ProjectingPagesWindowIndex windowIndex
) {
int currentRow = patternStart + matchedLabels.length() - 1;

// TODO:
// Block result =
// compute(
// currentRow,
// matchedLabels,
// aggregations,
// partitionStart,
// searchStart,
// searchEnd,
// patternStart,
// matchNumber,
// windowIndex,
// projection,
// expectedLayout,
// nulls,
// labelNames,
// session);

// return BOOLEAN.getBoolean(result, 0);
return false;
}
}

// public static class EvaluationSupplier {
// private final Supplier<PageProjection> projection;
// private final List<PhysicalValueAccessor> expectedLayout;
// private final List<String> labelNames;
// private final ConnectorSession session;
//
// public EvaluationSupplier(
// Supplier<PageProjection> projection,
// List<PhysicalValueAccessor> expectedLayout,
// List<String> labelNames,
// ConnectorSession session) {
// this.projection = requireNonNull(projection, "projection is null");
// this.expectedLayout = requireNonNull(expectedLayout, "expectedLayout is null");
// this.labelNames = requireNonNull(labelNames, "labelNames is null");
// this.session = requireNonNull(session, "session is null");
// }
//
// public Evaluation get() {
// return new Evaluation(projection.get(), expectedLayout, labelNames, session);
// }
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern;

import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.ArrayView;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class LogicalIndexNavigation {
public static final LogicalIndexNavigation NO_OP =
new LogicalIndexNavigation(Collections.emptySet(), true, true, 0, 0);

// a set of labels to navigate over:
// LAST(A.price, 3) => this is a navigation over rows with label A, so labels = {A}
// LAST(Union.price, 3) => this is a navigation over rows matching a union variable Union, so for
// SUBSET Union = (A, B, C), we have labels = {A, B, C}
// LAST(price, 3) => this is a navigation over "universal pattern variable", which is effectively
// over all rows, no matter the assigned labels. In such case labels = {}
private final Set<Integer> labels;
private final boolean last;
private final boolean running;
private final int logicalOffset;
private final int physicalOffset;

public LogicalIndexNavigation(
Set<Integer> labels, boolean last, boolean running, int logicalOffset, int physicalOffset) {
this.labels = requireNonNull(labels, "labels is null");
this.last = last;
this.running = running;
checkArgument(logicalOffset >= 0, "logical offset must be >= 0, actual: %s", logicalOffset);
this.logicalOffset = logicalOffset;
this.physicalOffset = physicalOffset;
}

public Set<Integer> getLabels() {
return labels;
}

public boolean isLast() {
return last;
}

public int getLogicalOffset() {
return logicalOffset;
}

public int getPhysicalOffset() {
return physicalOffset;
}

/**
* This method is used when evaluating labels during pattern matching, computing row pattern
* measures, and computing SKIP TO position after finding a match. Search is limited up to the
* current row in case of running semantics and to the entire match in case of final semantics.
*
* @return position within partition, or -1 if matching position was not found
*/
public int resolvePosition(
int currentRow, ArrayView matchedLabels, int searchStart, int searchEnd, int patternStart) {
checkArgument(
currentRow >= patternStart && currentRow < patternStart + matchedLabels.length(),
"current row is out of bounds of the match");

int relativePosition;
if (last) {
int start;
if (running) {
start = currentRow - patternStart;
} else {
start = matchedLabels.length() - 1;
}
relativePosition = findLastAndBackwards(start, matchedLabels);
} else {
relativePosition = findFirstAndForward(matchedLabels);
}
return adjustPosition(relativePosition, patternStart, searchStart, searchEnd);
}

// LAST(A.price, 3): find the last occurrence of label "A" and go 3 occurrences backwards
private int findLastAndBackwards(int searchStart, ArrayView matchedLabels) {
int position = searchStart + 1;
int found = 0;
while (found <= logicalOffset && position > 0) {
position--;
if (labels.isEmpty() || labels.contains(matchedLabels.get(position))) {
found++;
}
}
if (found == logicalOffset + 1) {
return position;
}
return -1;
}

// FIRST(A.price, 3): find the first occurrence of label "A" and go 3 occurrences forward
private int findFirstAndForward(ArrayView matchedLabels) {
int position = -1;
int found = 0;
while (found <= logicalOffset && position < matchedLabels.length() - 1) {
position++;
if (labels.isEmpty() || labels.contains(matchedLabels.get(position))) {
found++;
}
}
if (found == logicalOffset + 1) {
return position;
}
return -1;
}

// adjust position by patternStart to reflect position within partition
// adjust position by physical offset: skip a certain number of rows, regardless of labels
// check if the new position is within partition bound by: partitionStart - inclusive,
// partitionEnd - exclusive
private int adjustPosition(
int relativePosition, int patternStart, int searchStart, int searchEnd) {
if (relativePosition == -1) {
return -1;
}
int start = relativePosition + patternStart;
int target = start + physicalOffset;
if (target < searchStart || target >= searchEnd) {
return -1;
}
return target;
}

// for thread equivalence
public LogicalIndexNavigation withoutLogicalOffset() {
return withLogicalOffset(0);
}

public LogicalIndexNavigation withLogicalOffset(int logicalOffset) {
return new LogicalIndexNavigation(labels, last, running, logicalOffset, physicalOffset);
}

public LogicalIndexNavigation withoutPhysicalOffset() {
return withPhysicalOffset(0);
}

public LogicalIndexNavigation withPhysicalOffset(int physicalOffset) {
return new LogicalIndexNavigation(labels, last, running, logicalOffset, physicalOffset);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogicalIndexNavigation that = (LogicalIndexNavigation) o;
return last == that.last
&& running == that.running
&& logicalOffset == that.logicalOffset
&& physicalOffset == that.physicalOffset
&& Objects.equals(labels, that.labels);
}

@Override
public int hashCode() {
return Objects.hash(labels, last, running, logicalOffset, physicalOffset);
}
}
Loading

0 comments on commit fda9edb

Please sign in to comment.