Commit b8b07031 authored by Casualet's avatar Casualet

remove code

parent f61f3307
...@@ -428,14 +428,12 @@ ProxyState::getConn() const ...@@ -428,14 +428,12 @@ ProxyState::getConn() const
} }
const std::unique_ptr<Connect> & const std::unique_ptr<Connect> &
ProxyState::getEConn() const ProxyState::getEConn() const {
{
return e_conn; return e_conn;
} }
static void static void
embeddedTHDCleanup(THD *thd) embeddedTHDCleanup(THD *thd) {
{
thd->clear_data_list(); thd->clear_data_list();
--thread_count; --thread_count;
// thd->unlink() is called in by THD destructor // thd->unlink() is called in by THD destructor
...@@ -446,9 +444,9 @@ embeddedTHDCleanup(THD *thd) ...@@ -446,9 +444,9 @@ embeddedTHDCleanup(THD *thd)
delete thd; delete thd;
} }
/*???*/
void void
ProxyState::safeCreateEmbeddedTHD() ProxyState::safeCreateEmbeddedTHD() {
{
//THD is created by new, so there is no Lex or other things in it. //THD is created by new, so there is no Lex or other things in it.
THD *thd = static_cast<THD *>(create_embedded_thd(0)); THD *thd = static_cast<THD *>(create_embedded_thd(0));
assert(thd); assert(thd);
...@@ -458,8 +456,7 @@ ProxyState::safeCreateEmbeddedTHD() ...@@ -458,8 +456,7 @@ ProxyState::safeCreateEmbeddedTHD()
return; return;
} }
void ProxyState::dumpTHDs() void ProxyState::dumpTHDs(){
{
for (auto &it : thds) { for (auto &it : thds) {
it.release(); it.release();
} }
...@@ -485,8 +482,10 @@ std::string Delta::tableNameFromType(TableType table_type) const { ...@@ -485,8 +482,10 @@ std::string Delta::tableNameFromType(TableType table_type) const {
/*insert into the metadata table (kv) and then apply this to childrens*/ /*insert into the metadata table (kv) and then apply this to childrens*/
static static
bool create_delta_helper(CreateDelta* this_is, const std::unique_ptr<Connect> &e_conn,Delta::TableType table_type, std::string table_name, const DBMeta &meta_me, const DBMeta &parent, bool create_delta_helper(CreateDelta* this_is, const std::unique_ptr<Connect> &e_conn,
const AbstractMetaKey &meta_me_key, const unsigned int parent_id){ Delta::TableType table_type, std::string table_name,
const DBMeta &meta_me, const DBMeta &parent,
const AbstractMetaKey &meta_me_key, const unsigned int parent_id){
/*serialize the metame and meta_me_key, and escape*/ /*serialize the metame and meta_me_key, and escape*/
const std::string &child_serial = meta_me.serialize(parent); const std::string &child_serial = meta_me.serialize(parent);
assert(0 == meta_me.getDatabaseID()); assert(0 == meta_me.getDatabaseID());
......
...@@ -776,14 +776,14 @@ prettyPrintQuery(const std::string &query) ...@@ -776,14 +776,14 @@ prettyPrintQuery(const std::string &query)
<< "QUERY: " << COLOR_END << terminalEscape(query) << std::endl; << "QUERY: " << COLOR_END << terminalEscape(query) << std::endl;
} }
/*use environment variable to load configuration*/
SECURITY_RATING SECURITY_RATING
determineSecurityRating() determineSecurityRating(){
{
const char *const secure = getenv("SECURE_CRYPTDB"); const char *const secure = getenv("SECURE_CRYPTDB");
if (secure && equalsIgnoreCase("FALSE", secure)) { if (secure && equalsIgnoreCase("FALSE", secure)) {
return SECURITY_RATING::BEST_EFFORT; return SECURITY_RATING::BEST_EFFORT;
} }
return SECURITY_RATING::SENSITIVE; return SECURITY_RATING::SENSITIVE;
} }
......
...@@ -16,23 +16,19 @@ ...@@ -16,23 +16,19 @@
#include <parser/sql_utils.hh> #include <parser/sql_utils.hh>
#include <parser/mysql_type_metadata.hh> #include <parser/mysql_type_metadata.hh>
//为什么需要这个? //thread local variable
__thread ProxyState *thread_ps = NULL; __thread ProxyState *thread_ps = NULL;
class WrapperState { //wrapperstate contains proxystate. one per client.
class WrapperState{
WrapperState(const WrapperState &other); WrapperState(const WrapperState &other);
WrapperState &operator=(const WrapperState &rhs); WrapperState &operator=(const WrapperState &rhs);
KillZone kill_zone; KillZone kill_zone;
public: public:
std::string last_query; std::string last_query;
std::string default_db; std::string default_db;
std::ofstream * PLAIN_LOG; std::ofstream * PLAIN_LOG;
WrapperState() {} WrapperState() {}
~WrapperState() {}
const std::unique_ptr<QueryRewrite> &getQueryRewrite() const { const std::unique_ptr<QueryRewrite> &getQueryRewrite() const {
assert(this->qr); assert(this->qr);
return this->qr; return this->qr;
...@@ -46,36 +42,8 @@ public: ...@@ -46,36 +42,8 @@ public:
void setKillZone(const KillZone &kz) { void setKillZone(const KillZone &kz) {
kill_zone = kz; kill_zone = kz;
} }
std::unique_ptr<ProxyState> ps; std::unique_ptr<ProxyState> ps;
// we are running cryptdb in a threaded environment without proper
// locking; this leads to crashes during onion adjustment unless we
// take some minimal precautions
// > everytime we process a query we take a reference to the SchemaInfo
// so that we know the same SchemaInfo (and it's children) will be
// available on the backend for Deltaz; this handles the following
// known bad cases.
// a. thread A marks the cache as stale; thread B sees that it is
// stale and updates the cache; thread A crashes while
// trying to do onion adjustment
// b. we must take the reference at the same time we get the schema
// from the cache, otherwise ...
// + thread A takes the reference to SchemaInfo when the cache is
// already stale, now he ``gets'' his SchemaInfo; because the
// cache is stale the second SchemaInfo is a new object and the
// reference doesn't protect it. now when thread B gets his
// SchemaInfo the cache is still stale so he deletes the only
// reference to the SchemaInfo thread A is using
// + this case only applies if we aren't using the lock on each
// function (connect, disonnect, rewrite, envoi); thread A
// takes a reference to SchemaInfo then before he can ``get''
// the SchemaInfo thread B stales the cache. now thread A
// gets the SchemaInfo and his reference doesn't protect it.
// when thread C gets his SchemaInfo the cache is stale so
// he deletes the only reference to the SchemaInfo thread A
// is using
std::vector<SchemaInfoRef> schema_info_refs; std::vector<SchemaInfoRef> schema_info_refs;
private: private:
std::unique_ptr<QueryRewrite> qr; std::unique_ptr<QueryRewrite> qr;
}; };
...@@ -83,19 +51,19 @@ private: ...@@ -83,19 +51,19 @@ private:
//commented //commented
//static Timer t; //static Timer t;
//static EDBProxy * cl = NULL;
static SharedProxyState * shared_ps = NULL; static SharedProxyState * shared_ps = NULL;
//this ensures that only one client can call connect or next
static pthread_mutex_t big_lock; static pthread_mutex_t big_lock;
static bool EXECUTE_QUERIES = true; static bool EXECUTE_QUERIES = true;
static std::string TRAIN_QUERY =""; static std::string TRAIN_QUERY ="";
static bool LOG_PLAIN_QUERIES = false;
static std::string PLAIN_BASELOG = ""; static std::string PLAIN_BASELOG = "";
static int counter = 0;
static std::map<std::string, WrapperState*> clients; static std::map<std::string, WrapperState*> clients;
...@@ -103,23 +71,20 @@ static void ...@@ -103,23 +71,20 @@ static void
returnResultSet(lua_State *L, const ResType &res); returnResultSet(lua_State *L, const ResType &res);
static Item_null * static Item_null *
make_null(const std::string &name = "") make_null(const std::string &name = ""){
{
char *const n = current_thd->strdup(name.c_str()); char *const n = current_thd->strdup(name.c_str());
return new Item_null(n); return new Item_null(n);
} }
static std::string static std::string
xlua_tolstring(lua_State *const l, int index) xlua_tolstring(lua_State *const l, int index){
{
size_t len; size_t len;
char const *const s = lua_tolstring(l, index, &len); char const *const s = lua_tolstring(l, index, &len);
return std::string(s, len); return std::string(s, len);
} }
static void static void
xlua_pushlstring(lua_State *const l, const std::string &s) xlua_pushlstring(lua_State *const l, const std::string &s){
{
lua_pushlstring(l, s.data(), s.length()); lua_pushlstring(l, s.data(), s.length());
} }
...@@ -128,11 +93,12 @@ connect(lua_State *const L) { ...@@ -128,11 +93,12 @@ connect(lua_State *const L) {
//TODO: added, why test here? //TODO: added, why test here?
assert(test64bitZZConversions()); assert(test64bitZZConversions());
ANON_REGION(__func__, &perf_cg); // ANON_REGION(__func__, &perf_cg);
//Only one client can connect at a time
scoped_lock l(&big_lock); scoped_lock l(&big_lock);
assert(0 == mysql_thread_init()); assert(0 == mysql_thread_init());
//来自lua脚本的参数. //fetch lua paramaters.
const std::string client = xlua_tolstring(L, 1); const std::string client = xlua_tolstring(L, 1);
const std::string server = xlua_tolstring(L, 2); const std::string server = xlua_tolstring(L, 2);
const uint port = luaL_checkint(L, 3); const uint port = luaL_checkint(L, 3);
...@@ -143,82 +109,24 @@ connect(lua_State *const L) { ...@@ -143,82 +109,24 @@ connect(lua_State *const L) {
ConnectionInfo const ci = ConnectionInfo(server, user, psswd, port); ConnectionInfo const ci = ConnectionInfo(server, user, psswd, port);
assert(clients.end() == clients.find(client)); assert(clients.end() == clients.find(client));
//一个用户进入的时候, 新建一个wrapper, 走的时候删除. //one wrapperstate per client. This is deleted when the client leaves
clients[client] = new WrapperState(); clients[client] = new WrapperState();
// Is it the first connection? /*shared_ps is created as the first client comes in, and it is preserved.
* each proxystate takes a const reference of the sharedproxy state, which
* contains the schemainfo
*/
if (!shared_ps) { if (!shared_ps) {
std::cerr << "starting proxy\n"; const std::string &mkey = "113341234";
LOG(wrapper) << "connect " << client << "; "
<< "server = " << server << ":" << port << "; "
<< "user = " << user << "; "
<< "password = " << psswd;
const std::string &false_str = "FALSE";
const std::string &mkey = "113341234"; // XXX do not change as
// it's used for tpcc exps
//const std::string &mkey = "887766908";
shared_ps = shared_ps =
new SharedProxyState(ci, embed_dir, mkey, new SharedProxyState(ci, embed_dir, mkey,
determineSecurityRating()); determineSecurityRating());
//may need to do training
const char *ev = getenv("TRAIN_QUERY");
if (ev) {
std::cerr << "Deprecated query training!" << std::endl;
}
ev = getenv("EXECUTE_QUERIES");
if (ev && equalsIgnoreCase(false_str, ev)) {
LOG(wrapper) << "do not execute queries";
EXECUTE_QUERIES = false;
} else {
LOG(wrapper) << "execute queries";
EXECUTE_QUERIES = true;
}
ev = getenv("LOAD_ENC_TABLES");
if (ev) {
std::cerr << "No current functionality for loading tables\n";
}
ev = getenv("LOG_PLAIN_QUERIES");
if (ev) {
std::string logPlainQueries = std::string(ev);
if (logPlainQueries != "") {
LOG_PLAIN_QUERIES = true;
PLAIN_BASELOG = logPlainQueries;
logPlainQueries += StringFromVal(++counter);
assert_s(system(("rm -f" + logPlainQueries + "; touch " + logPlainQueries).c_str()) >= 0, "failed to rm -f and touch " + logPlainQueries);
std::ofstream * const PLAIN_LOG =
new std::ofstream(logPlainQueries, std::ios_base::app);
LOG(wrapper) << "proxy logs plain queries at " << logPlainQueries;
assert_s(PLAIN_LOG != NULL, "could not create file " + logPlainQueries);
clients[client]->PLAIN_LOG = PLAIN_LOG;
} else {
LOG_PLAIN_QUERIES = false;
}
}
} else {
if (LOG_PLAIN_QUERIES) {
std::string logPlainQueries =
PLAIN_BASELOG+StringFromVal(++counter);
assert_s(system((" touch " + logPlainQueries).c_str()) >= 0, "failed to remove or touch plain log");
LOG(wrapper) << "proxy logs plain queries at " << logPlainQueries;
std::ofstream * const PLAIN_LOG =
new std::ofstream(logPlainQueries, std::ios_base::app);
assert_s(PLAIN_LOG != NULL, "could not create file " + logPlainQueries);
clients[client]->PLAIN_LOG = PLAIN_LOG;
}
} }
clients[client]->ps = clients[client]->ps =
std::unique_ptr<ProxyState>(new ProxyState(*shared_ps)); std::unique_ptr<ProxyState>(new ProxyState(*shared_ps));
// We don't want to use the THD from the previous connection // We don't want to use the THD from the previous connection
// if such is even possible... // if such is even possible...
clients[client]->ps->safeCreateEmbeddedTHD(); clients[client]->ps->safeCreateEmbeddedTHD();
return 0; return 0;
} }
...@@ -227,28 +135,23 @@ disconnect(lua_State *const L) { ...@@ -227,28 +135,23 @@ disconnect(lua_State *const L) {
ANON_REGION(__func__, &perf_cg); ANON_REGION(__func__, &perf_cg);
scoped_lock l(&big_lock); scoped_lock l(&big_lock);
assert(0 == mysql_thread_init()); assert(0 == mysql_thread_init());
const std::string client = xlua_tolstring(L, 1); const std::string client = xlua_tolstring(L, 1);
if (clients.find(client) == clients.end()) { if (clients.find(client) == clients.end()) {
return 0; return 0;
} }
LOG(wrapper) << "disconnect " << client; LOG(wrapper) << "disconnect " << client;
auto ws = clients[client]; auto ws = clients[client];
clients[client] = NULL; clients[client] = NULL;
thread_ps = NULL; thread_ps = NULL;
delete ws; delete ws;
clients.erase(client); clients.erase(client);
mysql_thread_end(); mysql_thread_end();
return 0; return 0;
} }
static int static int
rewrite(lua_State *const L) { rewrite(lua_State *const L) {
ANON_REGION(__func__, &perf_cg); // ANON_REGION(__func__, &perf_cg);
scoped_lock l(&big_lock); scoped_lock l(&big_lock);
assert(0 == mysql_thread_init()); assert(0 == mysql_thread_init());
...@@ -266,7 +169,6 @@ rewrite(lua_State *const L) { ...@@ -266,7 +169,6 @@ rewrite(lua_State *const L) {
const std::string &query = xlua_tolstring(L, 2); const std::string &query = xlua_tolstring(L, 2);
const unsigned long long _thread_id = const unsigned long long _thread_id =
strtoull(xlua_tolstring(L, 3).c_str(), NULL, 10); strtoull(xlua_tolstring(L, 3).c_str(), NULL, 10);
//std::cout<<query<<std::endl;
//this is not used?? //this is not used??
c_wrapper->last_query = query; c_wrapper->last_query = query;
if (EXECUTE_QUERIES) { if (EXECUTE_QUERIES) {
...@@ -496,8 +398,7 @@ next(lua_State *const L) { ...@@ -496,8 +398,7 @@ next(lua_State *const L) {
} }
static void static void
returnResultSet(lua_State *const L, const ResType &rd) returnResultSet(lua_State *const L, const ResType &rd) {
{
TEST_GenericPacketException(true == rd.ok, "something bad happened"); TEST_GenericPacketException(true == rd.ok, "something bad happened");
lua_pushinteger(L, rd.affected_rows); lua_pushinteger(L, rd.affected_rows);
...@@ -548,6 +449,7 @@ cryptdb_lib[] = { ...@@ -548,6 +449,7 @@ cryptdb_lib[] = {
F(rewrite), F(rewrite),
F(next), F(next),
{ 0, 0 }, { 0, 0 },
#undef F
}; };
extern "C" int lua_cryptdb_init(lua_State * L); extern "C" int lua_cryptdb_init(lua_State * L);
......
...@@ -41,21 +41,16 @@ LOG_GROUPS(__temp_m) ...@@ -41,21 +41,16 @@ LOG_GROUPS(__temp_m)
class cryptdb_logger : public std::stringstream { class cryptdb_logger : public std::stringstream {
public: public:
cryptdb_logger(log_group g, const char *filearg, uint linearg, const char *fnarg) cryptdb_logger(log_group g, const char *filearg, uint linearg, const char *fnarg)
: m(mask(g)), file(filearg), line(linearg), func(fnarg) : m(mask(g)), file(filearg), line(linearg), func(fnarg){
{
} }
~cryptdb_logger(){
~cryptdb_logger()
{
if (enable_mask & m) if (enable_mask & m)
std::cerr << file << ":" << line std::cerr << file << ":" << line
<< " (" << func << "): " << " (" << func << "): "
<< str() << std::endl; << str() << std::endl;
} }
static void static void
enable(log_group g) enable(log_group g){
{
if (g == log_group::log_all) if (g == log_group::log_all)
enable_mask = ~0ULL; enable_mask = ~0ULL;
else else
...@@ -63,8 +58,7 @@ class cryptdb_logger : public std::stringstream { ...@@ -63,8 +58,7 @@ class cryptdb_logger : public std::stringstream {
} }
static void static void
disable(log_group g) disable(log_group g){
{
if (g == log_group::log_all) if (g == log_group::log_all)
enable_mask = 0; enable_mask = 0;
else else
...@@ -72,28 +66,24 @@ class cryptdb_logger : public std::stringstream { ...@@ -72,28 +66,24 @@ class cryptdb_logger : public std::stringstream {
} }
static bool static bool
enabled(log_group g) enabled(log_group g){
{
return enable_mask & mask(g); return enable_mask & mask(g);
} }
static uint64_t static uint64_t
mask(log_group g) mask(log_group g){
{
return 1ULL << ((int) g); return 1ULL << ((int) g);
} }
static std::string static std::string
getConf() getConf(){
{
std::stringstream ss; std::stringstream ss;
ss << enable_mask; ss << enable_mask;
return ss.str(); return ss.str();
} }
static void static void
setConf(std::string conf) setConf(std::string conf){
{
std::stringstream ss(conf); std::stringstream ss(conf);
ss >> enable_mask; ss >> enable_mask;
} }
...@@ -103,11 +93,8 @@ class cryptdb_logger : public std::stringstream { ...@@ -103,11 +93,8 @@ class cryptdb_logger : public std::stringstream {
const char *file; const char *file;
uint line; uint line;
const char *func; const char *func;
static uint64_t enable_mask; static uint64_t enable_mask;
}; };
#define LOG(g) \ #define LOG(g) \
(cryptdb_logger(log_group::log_ ## g, __FILE__, __LINE__, __func__)) (cryptdb_logger(log_group::log_ ## g, __FILE__, __LINE__, __func__))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment