Skip to content

Commit

Permalink
review comments part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
shizy818 committed Dec 4, 2024
1 parent 8962580 commit a033bc9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -158,14 +159,15 @@ private void updateChunkAndPageStatisticsFromTvLists() {
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
MergeSortTvListIterator timeValuePairIterator =
new MergeSortTvListIterator(dataType, encoding, floatPrecision, tvLists);
int[] tvListOffsets = timeValuePairIterator.getTVListOffsets();
while (timeValuePairIterator.hasNextTimeValuePair()) {
int[] tvListOffsetsBeforeNext = timeValuePairIterator.getLastTVListOffsets();
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
if (!isPointDeleted(tvPair.getTimestamp(), deletionList, deleteCursor)) {
if (cnt % MAX_NUMBER_OF_POINTS_IN_PAGE == 0) {
Statistics stats = Statistics.getStatsByType(dataType);
pageStatisticsList.add(stats);
pageOffsetsList.add(tvListOffsets);
pageOffsetsList.add(
Arrays.copyOf(tvListOffsetsBeforeNext, tvListOffsetsBeforeNext.length));
}

Statistics pageStatistics = pageStatisticsList.get(pageStatisticsList.size() - 1);
Expand Down Expand Up @@ -202,7 +204,6 @@ private void updateChunkAndPageStatisticsFromTvLists() {
// do nothing
}
pageStatistics.setEmpty(false);
tvListOffsets = timeValuePairIterator.getTVListOffsets();
cnt++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private boolean isOutOfMemPageBounds() {
int[] currTvListOffsets =
((MergeSortTvListIterator) timeValuePairIterator).getTVListOffsets();
for (int i = 0; i < pageEndOffsets.length; i++) {
if (currTvListOffsets[i] > pageEndOffsets[i]) {
if (currTvListOffsets[i] >= pageEndOffsets[i]) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@ public class MergeSortTvListIterator implements IPointReader {
private TSEncoding encoding;
private int floatPrecision = -1;

private int selectedTVListIndex = -1;
private TimeValuePair currentTvPair;

private final int[] tvListOffsets;
private final int[] lastTvListOffsets;

public MergeSortTvListIterator(TSDataType tsDataType, List<TVList> tvLists) {
this.tsDataType = tsDataType;
tvListIterators = new ArrayList<>();
for (TVList tvList : tvLists) {
tvListIterators.add(tvList.iterator());
}
this.tvListOffsets = new int[tvLists.size()];
this.lastTvListOffsets = new int[tvLists.size()];
}

public MergeSortTvListIterator(
Expand All @@ -53,9 +59,9 @@ public MergeSortTvListIterator(
this.floatPrecision = floatPrecision;
}

private int getSelectedTVListIndex() {
private void prepareNextRow() {
long time = Long.MAX_VALUE;
int selectedTVListIndex = -1;
selectedTVListIndex = -1;
for (int i = 0; i < tvListIterators.size(); i++) {
TVList.TVListIterator iterator = tvListIterators.get(i);
TimeValuePair currTvPair = null;
Expand All @@ -69,30 +75,36 @@ private int getSelectedTVListIndex() {
selectedTVListIndex = i;
}
}
return selectedTVListIndex;
}

@Override
public boolean hasNextTimeValuePair() {
boolean hasNext = false;
int selectedTVListIndex = getSelectedTVListIndex();
if (selectedTVListIndex >= 0) {
currentTvPair = tvListIterators.get(selectedTVListIndex).next();
hasNext = true;

// call next to skip identical timestamp in other iterators
for (int i = 0; i < tvListIterators.size(); i++) {
TimeValuePair tvPair = tvListIterators.get(i).current();
if (tvPair != null && tvPair.getTimestamp() == currentTvPair.getTimestamp()) {
tvListIterators.get(i).next();
}
}
if (selectedTVListIndex == -1) {
prepareNextRow();
}
return hasNext;
return selectedTVListIndex >= 0 && selectedTVListIndex < tvListIterators.size();
}

@Override
public TimeValuePair nextTimeValuePair() {
if (!hasNextTimeValuePair()) {
return null;
}
lastTvListOffsets[selectedTVListIndex] = tvListIterators.get(selectedTVListIndex).getIndex();
currentTvPair = tvListIterators.get(selectedTVListIndex).next();
tvListOffsets[selectedTVListIndex] = tvListIterators.get(selectedTVListIndex).getIndex();

// call next to skip identical timestamp in other iterators
for (int i = 0; i < tvListIterators.size(); i++) {
TimeValuePair tvPair = tvListIterators.get(i).current();
if (tvPair != null && tvPair.getTimestamp() == currentTvPair.getTimestamp()) {
lastTvListOffsets[i] = tvListIterators.get(i).getIndex();
tvListIterators.get(i).next();
tvListOffsets[i] = tvListIterators.get(i).getIndex();
}
}

selectedTVListIndex = -1;
return currentTimeValuePair();
}

Expand Down Expand Up @@ -122,6 +134,7 @@ public TimeValuePair currentTimeValuePair() {

@Override
public long getUsedMemorySize() {
// not used
return 0;
}

Expand All @@ -130,18 +143,18 @@ public void close() throws IOException {
tvListIterators.clear();
}

public int[] getLastTVListOffsets() {
return lastTvListOffsets;
}

public int[] getTVListOffsets() {
int size = tvListIterators.size();
int[] tvListOffsets = new int[size];
for (int i = 0; i < size; i++) {
tvListOffsets[i] = tvListIterators.get(i).getIndex();
}
return tvListOffsets;
}

public void setTVListOffsets(int[] tvListOffsets) {
for (int i = 0; i < tvListIterators.size(); i++) {
tvListIterators.get(i).setIndex(tvListOffsets[i]);
this.tvListOffsets[i] = tvListOffsets[i];
}
}
}

0 comments on commit a033bc9

Please sign in to comment.