Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ private LensConfConstants() throws LensException {
* The Constant DRIVER_CLASSES.
*/
public static final String DRIVER_CLASSES = SERVER_PFX + "drivers";

/**
* The Constant DRIVER_SELECTOR_CLASS.
*/
public static final String DRIVER_SELECTOR_CLASS = SERVER_PFX + "driver.selector.class";
/**
* The Constant ACCEPTOR_CLASSES.
*/
public static final String ACCEPTOR_CLASSES = SERVER_PFX + "acceptors";
/**
* The Constant SERVICE_NAMES.
*/
Expand Down Expand Up @@ -835,15 +842,15 @@ public static String getWSFilterImplConfKey(String filterName) {
public static final boolean DEFAULT_ENABLE_QUERY_METRICS = false;

/**
* Key used to hold value of unique id for query metrics. This wont be passed by user, will be generated and set.
* This is to pass unique id for query across the code flow.
* Key used to hold value of unique id for query metrics. This wont be passed by user, will be generated and set. This
* is to pass unique id for query across the code flow.
*/
public static final String QUERY_METRIC_UNIQUE_ID_CONF_KEY = QUERY_PFX + "metric.unique.id";

/**
* Key used to hold value query metric name in the stack. This wont be passed by user, will be generated and set.
* When each query looked at by driver, the metric needs to be different for each driver. This name capture the stack
* from which driver the code reached there.
* Key used to hold value query metric name in the stack. This wont be passed by user, will be generated and set. When
* each query looked at by driver, the metric needs to be different for each driver. This name capture the stack from
* which driver the code reached there.
*/
public static final String QUERY_METRIC_DRIVER_STACK_NAME = QUERY_PFX + "metric.driver.stack.name";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class QueryExecutionServiceImpl extends LensService implements QueryExecu
/**
* The accepted queries.
*/
private PriorityBlockingQueue<QueryContext> acceptedQueries = new PriorityBlockingQueue<QueryContext>();
private PriorityBlockingQueue<QueryContext> queuedQueries = new PriorityBlockingQueue<QueryContext>();

/**
* The launched queries.
Expand Down Expand Up @@ -262,7 +262,23 @@ public QueryExecutionServiceImpl(CLIService cliService) throws LensException {
/**
* Initialize query acceptors and listeners.
*/
private void initializeQueryAcceptorsAndListeners() {
private void initializeQueryAcceptors() throws LensException {
String[] acceptorClasses = conf.getStrings(LensConfConstants.ACCEPTOR_CLASSES);
if (acceptorClasses != null) {
for (String acceptorClass : acceptorClasses) {
try {
Class<?> clazz = Class.forName(acceptorClass);
QueryAcceptor acceptor = (QueryAcceptor) clazz.newInstance();
queryAcceptors.add(acceptor);
} catch (Exception e) {
LOG.warn("Could not load the acceptor:" + acceptorClass, e);
throw new LensException("Could not load acceptor" + acceptorClass, e);
}
}
}
}

private void initializeListeners() {
if (conf.getBoolean(LensConfConstants.QUERY_STATE_LOGGER_ENABLED, true)) {
getEventService().addListenerForType(new QueryStatusLogger(), StatusChange.class);
LOG.info("Registered query state logger");
Expand All @@ -281,7 +297,6 @@ private void initializeQueryAcceptorsAndListeners() {
* @throws LensException the lens exception
*/
private void loadDriversAndSelector() throws LensException {
conf.get(LensConfConstants.DRIVER_CLASSES);
String[] driverClasses = conf.getStrings(LensConfConstants.DRIVER_CLASSES);
if (driverClasses != null) {
for (String driverClass : driverClasses) {
Expand All @@ -304,7 +319,17 @@ private void loadDriversAndSelector() throws LensException {
} else {
throw new LensException("No drivers specified");
}
driverSelector = new MinQueryCostSelector();
try {
Class<? extends DriverSelector> driverSelectorClass = conf.getClass(LensConfConstants.DRIVER_SELECTOR_CLASS,
MinQueryCostSelector.class,
DriverSelector.class);
LOG.info("Using driver selector class: " + driverSelectorClass.getCanonicalName());
driverSelector = driverSelectorClass.newInstance();
} catch (Exception e) {
throw new LensException("Couldn't instantiate driver selector class. Class name: "
+ conf.get(LensConfConstants.DRIVER_SELECTOR_CLASS) + ". Please supply a valid value for "
+ LensConfConstants.DRIVER_SELECTOR_CLASS);
}
}

protected LensEventService getEventService() {
Expand Down Expand Up @@ -462,7 +487,7 @@ public void run() {
LOG.info("Starting QuerySubmitter thread");
while (!pausedForTest && !stopped && !querySubmitter.isInterrupted()) {
try {
QueryContext ctx = acceptedQueries.take();
QueryContext ctx = queuedQueries.take();
synchronized (ctx) {
if (ctx.getStatus().getStatus().equals(Status.QUEUED)) {
LOG.info("Launching query:" + ctx.getUserQuery());
Expand Down Expand Up @@ -608,7 +633,7 @@ private void updateFinishedQuery(QueryContext ctx, QueryStatus before) {
// before would be null in case of server restart
if (before != null) {
if (before.getStatus().equals(Status.QUEUED)) {
acceptedQueries.remove(ctx);
queuedQueries.remove(ctx);
} else {
launchedQueries.remove(ctx);
}
Expand Down Expand Up @@ -844,7 +869,12 @@ public void run() {
public synchronized void init(HiveConf hiveConf) {
super.init(hiveConf);
this.conf = hiveConf;
initializeQueryAcceptorsAndListeners();
try {
initializeQueryAcceptors();
} catch (LensException e) {
throw new IllegalStateException("Could not load acceptors");
}
initializeListeners();
try {
loadDriversAndSelector();
} catch (LensException e) {
Expand Down Expand Up @@ -873,7 +903,7 @@ private void initalizeFinishedQueryStore(Configuration conf) {
module.addSerializer(ColumnDescriptor.class, new JsonSerializer<ColumnDescriptor>() {
@Override
public void serialize(ColumnDescriptor columnDescriptor, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("name", columnDescriptor.getName());
jsonGenerator.writeStringField("comment", columnDescriptor.getComment());
Expand Down Expand Up @@ -997,8 +1027,8 @@ public Thread newThread(Runnable r) {
private static final String PARALLEL_CALL_GAUGE = "PARALLEL_ESTIMATE";

/**
* Rewrite the query for each driver, and estimate query cost for the rewritten queries.
* Finally, select the driver using driver selector.
* Rewrite the query for each driver, and estimate query cost for the rewritten queries. Finally, select the driver
* using driver selector.
*
* @param ctx query context
* @throws LensException the lens exception
Expand Down Expand Up @@ -1089,8 +1119,8 @@ private void rewriteAndSelect(final AbstractQueryContext ctx) throws LensExcepti
}

/**
* Chains driver specific rewrite and estimate of the query in a single runnable, which can be
* processed in a background thread
* Chains driver specific rewrite and estimate of the query in a single runnable, which can be processed in a
* background thread
*/
public class RewriteEstimateRunnable implements Runnable {
@Getter
Expand Down Expand Up @@ -1231,11 +1261,11 @@ private LensResultSet getResultset(QueryHandle queryHandle) throws LensException
if (resultSet == null) {
if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) {
resultSets
.put(queryHandle,
new LensPersistentResult(
ctx.getQueryOutputFormatter().getMetadata(),
ctx.getQueryOutputFormatter().getFinalOutputPath(),
ctx.getQueryOutputFormatter().getNumRows()));
.put(queryHandle,
new LensPersistentResult(
ctx.getQueryOutputFormatter().getMetadata(),
ctx.getQueryOutputFormatter().getFinalOutputPath(),
ctx.getQueryOutputFormatter().getNumRows()));
} else if (allQueries.get(queryHandle).isResultAvailableInDriver()) {
resultSet = allQueries.get(queryHandle).getSelectedDriver().fetchResultSet(allQueries.get(queryHandle));
resultSets.put(queryHandle, resultSet);
Expand Down Expand Up @@ -1471,7 +1501,7 @@ private QueryHandle executeAsyncInternal(LensSessionHandle sessionHandle, QueryC
ctx.setLensSessionIdentifier(sessionHandle.getPublicId().toString());
QueryStatus before = ctx.getStatus();
ctx.setStatus(new QueryStatus(0.0, QueryStatus.Status.QUEUED, "Query is queued", false, null, null));
acceptedQueries.add(ctx);
queuedQueries.add(ctx);
allQueries.put(ctx.getQueryHandle(), ctx);
fireStatusChangeEvent(ctx, ctx.getStatus(), before);
LOG.info("Returning handle " + ctx.getQueryHandle().getHandleId());
Expand Down Expand Up @@ -2135,7 +2165,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
switch (ctx.getStatus().getStatus()) {
case NEW:
case QUEUED:
acceptedQueries.add(ctx);
queuedQueries.add(ctx);
break;
case LAUNCHED:
case RUNNING:
Expand Down Expand Up @@ -2289,7 +2319,7 @@ Collection<LensDriver> getDrivers() {

@Override
public long getQueuedQueriesCount() {
return acceptedQueries.size();
return queuedQueries.size();
}

@Override
Expand Down Expand Up @@ -2321,6 +2351,7 @@ protected void handleDriverSessionStart(DriverEvent event) {
LOG.warn("Lens session went away for sessionid:" + lensSession);
return;
}

try {
LensSessionImpl session = getSession(sessionHandle);
acquire(sessionHandle);
Expand Down Expand Up @@ -2353,7 +2384,8 @@ protected void handleDriverSessionStart(DriverEvent event) {

/**
* Add session's resources to selected driver if needed
* @param ctx the query context
*
* @param ctx QueryContext for executinf queries
* @throws LensException
*/
protected void addSessionResourcesToDriver(final AbstractQueryContext ctx) {
Expand Down Expand Up @@ -2412,17 +2444,18 @@ protected void addSessionResourcesToDriver(final AbstractQueryContext ctx) {

/**
* Add resources to hive driver, returning resources which failed to be added
* @param resources collection of resources intented to be added to hive driver
*
* @param resources collection of resources intented to be added to hive driver
* @param sessionHandle
* @param hiveDriver
* @return resources which could not be added to hive driver
*/
private List<ResourceEntry> addResources(Collection<ResourceEntry> resources,
LensSessionHandle sessionHandle,
HiveDriver hiveDriver) {
LensSessionHandle sessionHandle,
HiveDriver hiveDriver) {
List<ResourceEntry> failedResources = new ArrayList<ResourceEntry>();
for (ResourceEntry res : resources) {
try{
try {
addSingleResourceToHive(hiveDriver, res, sessionHandle);
} catch (LensException exc) {
failedResources.add(res);
Expand All @@ -2434,7 +2467,7 @@ private List<ResourceEntry> addResources(Collection<ResourceEntry> resources,
}

private void addSingleResourceToHive(HiveDriver driver, ResourceEntry res,
LensSessionHandle sessionHandle) throws LensException {
LensSessionHandle sessionHandle) throws LensException {
String sessionIdentifier = sessionHandle.getPublicId().toString();
String uri = res.getLocation();
// Hive doesn't and URIs starting with file:/ correctly, so we have to change it to file:///
Expand Down
12 changes: 12 additions & 0 deletions lens-server/src/main/resources/lensserver-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
<description>Drivers enabled for this lens server instance</description>
</property>

<property>
<name>lens.server.driver.selector.class</name>
<value>org.apache.lens.server.api.driver.MinQueryCostSelector</value>
<description>Class for selecting best driver given the query context</description>
</property>

<property>
<name>lens.server.query.acceptors</name>
<value></value>
<description>Query Acceptors configuredsk</description>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

</property>

<property>
<name>lens.server.servicenames</name>
<value>session,query,metastore,scheduler,quota</value>
Expand Down