1 /***********************************************************************
2  * Copyright (c) 2017-2024 IBM
3  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Apache License, Version 2.0
6  * which accompanies this distribution and is available at
7  * http://www.opensource.org/licenses/apache2.0.php.
8  ***********************************************************************/
9 
10 package org.locationtech.geomesa.cassandra.data
11 
12 import com.datastax.driver.core.{Row, Statement}
13 import org.geotools.api.filter.Filter
14 import org.locationtech.geomesa.cassandra.utils.CassandraBatchScan
15 import org.locationtech.geomesa.index.api.QueryPlan.{FeatureReducer, ResultsToFeatures}
16 import org.locationtech.geomesa.index.api.{FilterStrategy, QueryPlan}
17 import org.locationtech.geomesa.index.utils.Explainer
18 import org.locationtech.geomesa.index.utils.Reprojection.QueryReferenceSystems
19 import org.locationtech.geomesa.index.utils.ThreadManagement.Timeout
20 import org.locationtech.geomesa.utils.collection.CloseableIterator
21 
22 sealed trait CassandraQueryPlan extends QueryPlan[CassandraDataStore] {
23 
24   override type Results = Row
25 
26   def tables: Seq[String]
27   def ranges: Seq[Statement]
28   def numThreads: Int
29 
30   /**
31     * Note: filter is applied in entriesToFeatures, this is just for explain logging
32     * @return
33     */
34   def clientSideFilter: Option[Filter]
35 
36   override def explain(explainer: Explainer, prefix: String): Unit =
37     CassandraQueryPlan.explain(this, explainer, prefix)
38 }
39 
40 object CassandraQueryPlan {
41   def explain(plan: CassandraQueryPlan, explainer: Explainer, prefix: String): Unit = {
42     import org.locationtech.geomesa.filter.filterToString
43     explainer.pushLevel(s"${prefix}Plan: ${plan.getClass.getName}")
44     explainer(s"Tables: ${plan.tables.mkString(", ")}")
45     explainer(s"Ranges (${plan.ranges.size}): ${plan.ranges.take(5).map(_.toString).mkString(", ")}")
46     explainer(s"Client-side filter: ${plan.clientSideFilter.map(filterToString).getOrElse("none")}")
47     explainer(s"Reduce: ${plan.reducer.getOrElse("none")}")
48     explainer.popLevel()
49   }
50 }
51 
52 // plan that will not actually scan anything
53 case class EmptyPlan(filter: FilterStrategy, reducer: Option[FeatureReducer] = None) extends CassandraQueryPlan {
54   override val tables: Seq[String] = Seq.empty
55   override val ranges: Seq[Statement] = Seq.empty
56   override val numThreads: Int = 0
57   override val clientSideFilter: Option[Filter] = None
58   override val resultsToFeatures: ResultsToFeatures[Row] = ResultsToFeatures.empty
59   override val sort: Option[Seq[(String, Boolean)]] = None
60   override val maxFeatures: Option[Int] = None
61   override val projection: Option[QueryReferenceSystems] = None
62   override def scan(ds: CassandraDataStore): CloseableIterator[Row] = CloseableIterator.empty
63 }
64 
65 case class StatementPlan(
66     filter: FilterStrategy,
67     tables: Seq[String],
68     ranges: Seq[Statement],
69     numThreads: Int,
70     // note: filter is applied in entriesToFeatures, this is just for explain logging
71     clientSideFilter: Option[Filter],
72     resultsToFeatures: ResultsToFeatures[Row],
73     reducer: Option[FeatureReducer],
74     sort: Option[Seq[(String, Boolean)]],
75     maxFeatures: Option[Int],
76     projection: Option[QueryReferenceSystems]
77   ) extends CassandraQueryPlan {
78 
79   override def scan(ds: CassandraDataStore): CloseableIterator[Row] =
80     CassandraBatchScan(this, ds.session, ranges, numThreads, ds.config.queries.timeout.map(Timeout.apply))
81 }
Line Stmt Id Pos Tree Symbol Tests Code
37 83092 1534 - 1585 Apply org.locationtech.geomesa.cassandra.data.CassandraQueryPlan.explain CassandraQueryPlan.explain(this, explainer, prefix)
43 83093 1789 - 1790 Literal <nosymbol> ""
43 83095 1828 - 1829 Literal <nosymbol> ""
43 83094 1798 - 1805 Literal <nosymbol> "Plan: "
43 83097 1787 - 1829 Apply scala.StringContext.s scala.StringContext.apply("", "Plan: ", "").s(prefix, plan.getClass().getName())
43 83096 1806 - 1827 Apply java.lang.Class.getName plan.getClass().getName()
43 83098 1767 - 1830 Apply org.locationtech.geomesa.index.utils.Explainer.pushLevel explainer.pushLevel(scala.StringContext.apply("", "Plan: ", "").s(prefix, plan.getClass().getName()))
44 83099 1847 - 1856 Literal <nosymbol> "Tables: "
44 83101 1857 - 1883 Apply scala.collection.TraversableOnce.mkString plan.tables.mkString(", ")
44 83100 1884 - 1885 Literal <nosymbol> ""
44 83103 1835 - 1886 Apply org.locationtech.geomesa.index.utils.Explainer.apply explainer.apply(scala.StringContext.apply("Tables: ", "").s(plan.tables.mkString(", ")))
44 83102 1845 - 1885 Apply scala.StringContext.s scala.StringContext.apply("Tables: ", "").s(plan.tables.mkString(", "))
45 83105 1930 - 1934 Literal <nosymbol> "): "
45 83104 1903 - 1912 Literal <nosymbol> "Ranges ("
45 83107 1913 - 1929 Select scala.collection.SeqLike.size plan.ranges.size
45 83106 1986 - 1987 Literal <nosymbol> ""
45 83109 1901 - 1987 Apply scala.StringContext.s scala.StringContext.apply("Ranges (", "): ", "").s(plan.ranges.size, plan.ranges.take(5).map[String, Seq[String]](((x$1: com.datastax.driver.core.Statement) => x$1.toString()))(collection.this.Seq.canBuildFrom[String]).mkString(", "))
45 83108 1935 - 1985 Apply scala.collection.TraversableOnce.mkString plan.ranges.take(5).map[String, Seq[String]](((x$1: com.datastax.driver.core.Statement) => x$1.toString()))(collection.this.Seq.canBuildFrom[String]).mkString(", ")
45 83110 1891 - 1988 Apply org.locationtech.geomesa.index.utils.Explainer.apply explainer.apply(scala.StringContext.apply("Ranges (", "): ", "").s(plan.ranges.size, plan.ranges.take(5).map[String, Seq[String]](((x$1: com.datastax.driver.core.Statement) => x$1.toString()))(collection.this.Seq.canBuildFrom[String]).mkString(", ")))
46 83111 2005 - 2026 Literal <nosymbol> "Client-side filter: "
46 83113 2027 - 2086 Apply scala.Option.getOrElse plan.clientSideFilter.map[String]({ ((filter: org.geotools.api.filter.Filter) => org.locationtech.geomesa.filter.`package`.filterToString(filter)) }).getOrElse[String]("none")
46 83112 2087 - 2088 Literal <nosymbol> ""
46 83115 1993 - 2089 Apply org.locationtech.geomesa.index.utils.Explainer.apply explainer.apply(scala.StringContext.apply("Client-side filter: ", "").s(plan.clientSideFilter.map[String]({ ((filter: org.geotools.api.filter.Filter) => org.locationtech.geomesa.filter.`package`.filterToString(filter)) }).getOrElse[String]("none")))
46 83114 2003 - 2088 Apply scala.StringContext.s scala.StringContext.apply("Client-side filter: ", "").s(plan.clientSideFilter.map[String]({ ((filter: org.geotools.api.filter.Filter) => org.locationtech.geomesa.filter.`package`.filterToString(filter)) }).getOrElse[String]("none"))
47 83117 2147 - 2148 Literal <nosymbol> ""
47 83116 2106 - 2115 Literal <nosymbol> "Reduce: "
47 83119 2104 - 2148 Apply scala.StringContext.s scala.StringContext.apply("Reduce: ", "").s(plan.reducer.getOrElse[Object]("none"))
47 83118 2116 - 2146 Apply scala.Option.getOrElse plan.reducer.getOrElse[Object]("none")
47 83120 2094 - 2149 Apply org.locationtech.geomesa.index.utils.Explainer.apply explainer.apply(scala.StringContext.apply("Reduce: ", "").s(plan.reducer.getOrElse[Object]("none")))
48 83121 2154 - 2174 Apply org.locationtech.geomesa.index.utils.Explainer.popLevel explainer.popLevel()
48 83122 2172 - 2172 Literal <nosymbol> ()
54 83123 2378 - 2387 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
55 83124 2428 - 2437 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
56 83125 2471 - 2472 Literal <nosymbol> 0
57 83126 2523 - 2527 Select scala.None scala.None
58 83127 2587 - 2610 TypeApply org.locationtech.geomesa.index.api.QueryPlan.ResultsToFeatures.empty org.locationtech.geomesa.index.api.QueryPlan.ResultsToFeatures.empty[com.datastax.driver.core.Row]
59 83128 2665 - 2669 Select scala.None scala.None
60 83129 2712 - 2716 Select scala.None scala.None
61 83130 2776 - 2780 Select scala.None scala.None
62 83131 2851 - 2874 TypeApply org.locationtech.geomesa.utils.collection.CloseableIterator.empty org.locationtech.geomesa.utils.collection.CloseableIterator.empty[Nothing]
80 83133 3477 - 3483 Select org.locationtech.geomesa.cassandra.data.StatementPlan.ranges StatementPlan.this.ranges
80 83132 3465 - 3475 Select org.locationtech.geomesa.cassandra.data.CassandraDataStore.session ds.session
80 83135 3527 - 3540 Apply org.locationtech.geomesa.index.utils.ThreadManagement.Timeout.apply org.locationtech.geomesa.index.utils.ThreadManagement.Timeout.apply(relative)
80 83134 3485 - 3495 Select org.locationtech.geomesa.cassandra.data.StatementPlan.numThreads StatementPlan.this.numThreads
80 83137 3440 - 3542 Apply org.locationtech.geomesa.cassandra.utils.CassandraBatchScan.apply org.locationtech.geomesa.cassandra.utils.CassandraBatchScan.apply(this, ds.session, StatementPlan.this.ranges, StatementPlan.this.numThreads, ds.config.queries.timeout.map[org.locationtech.geomesa.index.utils.ThreadManagement.Timeout]({ ((relative: Long) => org.locationtech.geomesa.index.utils.ThreadManagement.Timeout.apply(relative)) }))
80 83136 3497 - 3541 Apply scala.Option.map ds.config.queries.timeout.map[org.locationtech.geomesa.index.utils.ThreadManagement.Timeout]({ ((relative: Long) => org.locationtech.geomesa.index.utils.ThreadManagement.Timeout.apply(relative)) })